在系统设计中,经常我们希望设计一套插件机制,在不修改程序主体情况下,动态去加载附能。
我设想的插件系统:
1、通过类来实现
2、自动查找和导入
我们假设需要实现一个简单的插件系统,插件可以接收一个参数执行。
实现基础插件类
我们先构建一个基础插件类:plugin_collection.py
class Plugin:
"""
该基类每个插件都需要继承,插件需要实现基类定义的方法"""
def __init__(self):
self.description = '未知'
def perform_operation(self, argument):
"""
实际执行插件所执行的方法,该方法所有插件类都需要实现
"""
raise NotImplementedError
所有的插件类需要申明description来进行插件描述,并且要实现perform_operation方法,该方法是实际加载插件将去执行的方法。
简易插件
我们现在实现一个插件,实际执行时仅返回传入的参数: plugins/identity.py
import plugin_collection
class Identity(plugin_collection.Plugin):
"""
This plugin is just the identity function: it returns the argument
"""
def __init__(self):
super().__init__()
self.description = 'Identity function'
def perform_operation(self, argument):
"""
The actual implementation of the identity plugin is to just return the
argument
"""
return argument
动态加载机制
因为我们预实现动态加载插件。我们通过定义一个PluginCollection来完成该职责,它将载入所有的插件,并且根据传入的值执行perform_operation方法。PluginCollection类基础组件实现如下:plugins_collection.py
class PluginCollection:
"""
该类会通过传入的package查找继承了Plugin类的插件类
"""
def __init__(self, plugin_package):
self.plugin_package = plugin_package
self.reload_plugins()
def reload_plugins(self):
"""
重置plugins列表,遍历传入的package查询有效的插件
"""
self.plugins = []
self.seen_paths = []
print()
print(f"在 {self.plugin_package} 包里查找插件")
self.walk_package(self.plugin_package)
def apply_all_plugins_on_value(self, argument):
print()
print(f"执行参数 {argument} 到所有的插件:")
for plugin in self.plugins:
print(f" 执行 {plugin.description} 参数 {argument} 结果 {plugin.perform_operation(argument)}")
最关键的是PluginCollection类里的walk_package方法,该方法按如下步骤操作:
1、操作package里所有的模块
2、针对找到的模块,检查是否是Plugin的子类,非Plugin自身。每个插件将会初始化并加入到列表。该检查的好处是你可以放入其他Python模块,也并不影响插件的使用
3、检查当前package下的子目录,递归查找插件
def walk_package(self, package):
"""
递归遍历包里获取所有的插件
"""
imported_package = __import__(package, fromlist=['blah'])
for _, pluginname, ispkg in pkgutil.iter_modules(imported_package.__path__, imported_package.__name__ + '.'):
if not ispkg:
plugin_module = __import__(pluginname, fromlist=['blah'])
clsmembers = inspect.getmembers(plugin_module, inspect.isclass)
for (_, c) in clsmembers:
# 仅加入Plugin类的子类,忽略掉Plugin本身
if issubclass(c, Plugin) and (c is not Plugin):
print(f' 找到插件类: {c.__module__}.{c.__name__}')
self.plugins.append(c())
# 现在我们已经查找了当前package中的所有模块,现在我们递归查找子packages里的附件模块
all_current_paths = []
if isinstance(imported_package.__path__, str):
all_current_paths.append(imported_package.__path__)
else:
all_current_paths.extend([x for x in imported_package.__path__])
for pkg_path in all_current_paths:
if pkg_path not in self.seen_paths:
self.seen_paths.append(pkg_path)
# 获取当前package中的子目录
child_pkgs = [p for p in os.listdir(pkg_path) if os.path.isdir(os.path.join(pkg_path, p))]
# 递归遍历子目录的package
for child_pkg in child_pkgs:
self.walk_package(package + '.' + child_pkg)
测试
现在我们写个简单的测试:test.py
from plugin_collection import PluginCollection
my_plugins = PluginCollection('plugins')
my_plugins.apply_all_plugins_on_value(5)
执行结果:
$ python3 test.py
在 plugins 包里查找插件
找到插件类: plugins.identity.Identity
执行参数 5 到所有的插件:
执行 Identity function 参数 5 结果 5