OpenStack 云平台流量监控插件tap-as-a-service(Taas)代码解析(二):
在上一篇文章中,以create_tap_service為例,講解了OpenStack中云端流量捕獲插件Tap-as-a-service的Plugin的代碼流程
(https://blog.csdn.net/m0_37313888/article/details/82693789)
先回顧一下Taas的結構,這篇文章我將繼續講解Taas Agent與Taas Driver的工作流程
(neutron對應的OpenStack版本:Queuens,關于如何在生產環境中的openstack上安裝tap-as-a-service插件請看我的另一篇文章https://blog.csdn.net/m0_37313888/article/details/83450245)
1.Taas插件加載與初始化
眾所周知,OpenStack中Plugin只負責數據庫以及消息隊列的維護,工作具體是在agent節點上執行的。taas作為在這里是作為OpenvSwitch的一個extension集成到neutron-openvswitch-agent中,并且在neutron-openvswitch-agent啟動時進行加載與初始化。
首先看一看taas 在agent上的初始化過程
在neutron-openvswitch-agent加載時,順便把taas插件加載的過程:
1)插件加載與實例化
在neutron-openvswitch-agent初始化代碼中(代碼位置:neutron/plugins/ml2/driver/openvswitch/agent/ovs_neutron_agent.py)
,第2290行附近,有這么一段代碼,這段代碼將實例從配置文件中讀出并且添加到ext_mgr作為ext_mgr的一個成員對象。
2288 ext_manager.register_opts(cfg.CONF) 2289 2290 ext_mgr = ext_manager.L2AgentExtensionsManager(cfg.CONF)對應到neutron/agent/l2/l2_agent_extensions_manager.py中的下面這段代碼
21 L2_AGENT_EXT_MANAGER_NAMESPACE = 'neutron.agent.l2.extensions' 22 23 24 def register_opts(conf): 25 agent_ext_mgr_config.register_agent_ext_manager_opts(conf)neutron/conf/agent/agent_extensions_manager.py
17 AGENT_EXT_MANAGER_OPTS = [ 18 cfg.ListOpt('extensions', 19 default=[], 20 help=_('Extensions list to use')), 21 ] 22 23 24 def register_agent_ext_manager_opts(cfg=cfg.CONF): 25 cfg.register_opts(AGENT_EXT_MANAGER_OPTS, 'agent')代碼是直接讀了配置文件中agent部分的extensions選項。具體的配置過程請閱讀https://blog.csdn.net/m0_37313888/article/details/83450245
加載插件的具體位置如下,加載下面這個插件就是創建了下面這個類的一個實例。
[neutron.agent.l2.extensions] taas = neutron_taas.services.taas.agents.extensions.taas:TaasAgentExtension2)插件的初始化
插件的加載只是在內存中生成了一個實例。但是這個插件實例還沒有具體發揮它的作用,這個插件實例在OVSNeutronAgent的運行過程中才真正發揮它的作用。在neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py中插件管理器作為了OVSNeutronAgent的成員參與到了OVSNeutronAgent的工作中.(下圖中的ext_mgr就是包含了taas的插件管理器)
neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
2297 try: 2298 agent = OVSNeutronAgent(bridge_classes, ext_mgr, cfg.CONF) 2299 capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent) 2300 except (RuntimeError, ValueError) as e: 2301 LOG.error("%s Agent terminated!", e) 2302 sys.exit(1) 2303 agent.daemon_loop()插件管理器被傳遞到OVSNeutronAgent的成員變量中(下圖)
140 self.ext_manager = ext_manager之后,還是在這個代碼中,進行下面的操作:1.綁定agent_api操作2.初始化所有的插件
219 agent_api = ovs_ext_api.OVSAgentExtensionAPI(self.int_br, self.tun_br) 220 self.ext_manager.initialize( 221 self.connection, constants.EXTENSION_DRIVER_TYPE, agent_api)查看具體的 代碼實現如下:
class AgentExtensionsManager(stevedore.named.NamedExtensionManager):"""Manage agent extensions."""def __init__(self, conf, namespace):super(AgentExtensionsManager, self).__init__(namespace, conf.agent.extensions,invoke_on_load=True, name_order=True)LOG.info("Loaded agent extensions: %s", self.names())def initialize(self, connection, driver_type, agent_api=None):# Initialize each agent extension in the list.for extension in self:LOG.info("Initializing agent extension '%s'", extension.name)# If the agent has provided an agent_api object, this object will# be passed to all interested extensions. This object must be# consumed by each such extension before the extension's# initialize() method is called, as the initialization step# relies on the agent_api already being available.extension.obj.consume_api(agent_api)extension.obj.initialize(connection, driver_type)taas-agent-extension在這段代碼中被加載,我們來看看taas-agent-extension做了啥
class TaasAgentExtension(l2_extension.L2AgentExtension):def initialize(self, connection, driver_type):"""Initialize agent extension."""self.taas_agent = taas_ovs_agent.TaasOvsAgentRpcCallback(cfg.CONF, driver_type)self.taas_agent.consume_api(self.agent_api)self.taas_agent.initialize()def consume_api(self, agent_api):"""Receive neutron agent API objectAllows an extension to gain access to resources internal to theneutron agent and otherwise unavailable to the extension."""self.agent_api = agent_apidef handle_port(self, context, port):passdef delete_port(self, context, port):pass三件事:
1. 加載本地rpc回調函數類: TaasOvsAgentRpcCallback
2. 加載agent_api: ovs_ext_api.OVSAgentExtensionAPI(self.int_br, self.tun_br)?
3.?初始化本地rpc回調函數類?
本地的rpc回調函數類如下:
class TaasOvsAgentRpcCallback(api.TaasAgentRpcCallbackMixin):def __init__(self, conf, driver_type):LOG.debug("TaaS OVS Agent initialize called")self.conf = confself.driver_type = driver_typesuper(TaasOvsAgentRpcCallback, self).__init__()def initialize(self):self.taas_driver = manager.NeutronManager.load_class_for_provider('neutron_taas.taas.agent_drivers', self.driver_type)()self.taas_driver.consume_api(self.agent_api)self.taas_driver.initialize()self._taas_rpc_setup()TaasOvsAgentService(self).start()def consume_api(self, agent_api):self.agent_api = agent_apidef _invoke_driver_for_plugin_api(self, context, args, func_name):........try:self.taas_driver.__getattribute__(func_name)(args)except Exception:LOG.debug("Failed to invoke the driver")returndef create_tap_service(self, context, tap_service, host):........def create_tap_flow(self, context, tap_flow_msg, host):........def delete_tap_service(self, context, tap_service, host):........def delete_tap_flow(self, context, tap_flow_msg, host):........def _taas_rpc_setup(self):# setup RPC to msg taas pluginself.taas_plugin_rpc = TaasOvsPluginApi(topics.TAAS_PLUGIN, self.conf.host)endpoints = [self]conn = n_rpc.Connection()conn.create_consumer(topics.TAAS_AGENT, endpoints, fanout=False)conn.consume_in_threads()def periodic_tasks(self):........我們來重點看一看上面函數中的initialize()部分的代碼,這個是agent上的taas-extension的rpc代碼的初始化的核心代碼
1.?
self.taas_driver = manager.NeutronManager.load_class_for_provider('neutron_taas.taas.agent_drivers', self.driver_type)()看一看NeutronManager.load_class_for_provider
@staticmethoddef load_class_for_provider(namespace, plugin_provider):"""Loads plugin using alias or class name:param namespace: namespace where alias is defined:param plugin_provider: plugin alias or class name:returns: plugin that is loaded:raises ImportError: if fails to load plugin"""try:return runtime.load_class_by_alias_or_classname(namespace,plugin_provider)except ImportError:with excutils.save_and_reraise_exception():LOG.error("Plugin '%s' not found.", plugin_provider)runtime.load_class_by_alias_or_classname(namespace,? plugin_provider)如下:
def load_class_by_alias_or_classname(namespace, name):if not name:LOG.error("Alias or class name is not set")raise ImportError(_("Class not found."))try:mgr = driver.DriverManager(namespace, name, warn_on_missing_entrypoint=False)class_to_load = mgr.driverexcept RuntimeError:e1_info = sys.exc_info()# Fallback to class nametry:class_to_load = importutils.import_class(name)except (ImportError, ValueError):LOG.error("Error loading class by alias",exc_info=e1_info)LOG.error("Error loading class by class name",exc_info=True)raise ImportError(_("Class not found."))return class_to_load通過stevedore.driver.DriverManager類動態加載模塊。這是一個與動態加載類有關的模塊。關于這個模塊的使用,可以看看這篇文章:http://www.360doc.com/content/14/0429/19/9482_373285413.shtml
然后self.taas_driver通過給出的namespace:
neutron_taas.taas.agent_drivers與 driver_type:
ovs? 被加載。具體被加載的類寫在了setup.cfg的entry_points里面
[entry_points] neutron.agent.l2.extensions =taas = neutron_taas.services.taas.agents.extensions.taas:TaasAgentExtension neutron_taas.taas.agent_drivers =ovs = neutron_taas.services.taas.drivers.linux.ovs_taas:OvsTaasDriver neutron.service_plugins =taas = neutron_taas.services.taas.taas_plugin:TaasPlugin neutron.db.alembic_migrations =tap-as-a-service = neutron_taas.db.migration:alembic_migration tempest.test_plugins =tap-as-a-service = neutron_taas.tests.tempest_plugin.plugin:NeutronTaaSPlugin neutronclient.extension =tap_service = neutron_taas.taas_client.tapservicetap_flow = neutron_taas.taas_client.tapflow即,self.taas_driver?=?
neutron_taas.services.taas.drivers.linux.ovs_taas:OvsTaasDriver2.
self.taas_driver.consume_api(self.agent_api)在self.taas_driver, 即neutron_taas.services.taas.drivers.linux.ovs_taas:OvsTaasDriver中,consume_api()方法如下:
def consume_api(self, agent_api):self.agent_api = agent_api簡單的增加了一個self.agent_api成員
3.
self.taas_driver.initialize()還是看一下self.taas_driver,
def initialize(self):self.int_br = self.agent_api.request_int_br()self.tun_br = self.agent_api.request_tun_br()self.tap_br = OVSBridge_tap_extension('br-tap', self.root_helper)# Prepare OVS bridges for TaaSself.setup_ovs_bridges()# Setup key-value manager for ingress BCMC flowsself.bcmc_kvm = taas_ovs_utils.key_value_mgr(4096)這個時候agent_api就派上用場了!這個對象,顧名思義,就是一個agent所映射產生的一個api對象,提供許多訪問本地資源的接口,在ovs中,是本地所建立的網橋與相關網絡設備。我們看到,先是獲取了int-br網橋與tun_br 網橋,來看一看獲取網橋的基本操作,這里agent_api是ovs_ext_api.OVSAgentExtensionAPI(self.int_br, self.tun_br)?
如下
class OVSAgentExtensionAPI(object):'''Implements the Agent API for Open vSwitch agent.Extensions can gain access to this API by overriding the consume_apimethod which has been added to the AgentExtension class.'''def __init__(self, int_br, tun_br):super(OVSAgentExtensionAPI, self).__init__()self.br_int = int_brself.br_tun = tun_brdef request_int_br(self):"""Allows extensions to request an integration bridge to use forextension specific flows."""return OVSCookieBridge(self.br_int)def request_tun_br(self):"""Allows extensions to request a tunnel bridge to use forextension specific flows.If tunneling is not enabled, this method will return None."""if not self.br_tun:return Nonereturn OVSCookieBridge(self.br_tun)這里OVSCookieBridge讓我們想到了web開發中的cookie,但是這個其實就是把本地的網橋信息淺拷貝一份然后返回給程序進一步使用
class OVSCookieBridge(object):def __new__(cls, bridge):cookie_bridge = bridge.clone()cookie_bridge.set_agent_uuid_stamp(bridge.request_cookie())return cookie_bridgedef __init__(self, bridge):pass def clone(self):'''Used by OVSCookieBridge, can be overridden by subclasses if abehavior different from copy.copy is needed.'''return copy.copy(self)接著,然后建立了一個tap網橋。這一步是在agent上實現流量拷貝、遷移、以及重定向的重頭戲。
self.setup_ovs_bridges()具體代碼在neutron_taas/services/taas/drivers/linux/ovs_taas.py
這里為了直觀的展示這個流程,把流程分為4個部分
1.創建一個tap網橋
剛開始我看這一段的時候也很是迷惑,因為ovs有專門的流量鏡像功能啊,為何不直接管理ovs做一個流量鏡像呢?原來,tap-as--a-service的主要設計理念是為不同節點上的虛擬機提供統一的流量捕獲服務,這樣的話方案中流量必須能夠方便的進行跨主機遷移。如果采用vxlan隧道的話就很容易建立新的流量隧道實現流量的跨節點遷移。換句話說,流量既能夠定向到相同宿主機上的另一臺虛擬機,也能重定向到另一臺宿主機上的另一臺虛擬機!
self.tap_br.create()# Connect br-tap to br-int and br-tunself.int_br.add_patch_port('patch-int-tap', 'patch-tap-int')self.tap_br.add_patch_port('patch-tap-int', 'patch-int-tap')self.tun_br.add_patch_port('patch-tun-tap', 'patch-tap-tun')self.tap_br.add_patch_port('patch-tap-tun', 'patch-tun-tap')# Get patch port IDspatch_tap_int_id = self.tap_br.get_port_ofport('patch-tap-int')patch_tap_tun_id = self.tap_br.get_port_ofport('patch-tap-tun')patch_tun_tap_id = self.tun_br.get_port_ofport('patch-tun-tap')2.刪除預先定義的與tap-as-a-service相關的所有規則
self.tap_br.delete_flows(table=0)self.tap_br.delete_flows(table=taas_ovs_consts.TAAS_RECV_LOC)self.tap_br.delete_flows(table=taas_ovs_consts.TAAS_RECV_REM)self.tun_br.delete_flows(table=0,in_port=patch_tun_tap_id)self.tun_br.delete_flows(table=taas_ovs_consts.TAAS_SEND_UCAST)self.tun_br.delete_flows(table=taas_ovs_consts.TAAS_SEND_FLOOD)self.tun_br.delete_flows(table=taas_ovs_consts.TAAS_CLASSIFY)self.tun_br.delete_flows(table=taas_ovs_consts.TAAS_DST_CHECK)self.tun_br.delete_flows(table=taas_ovs_consts.TAAS_SRC_CHECK)self.tun_br.delete_flows(table=taas_ovs_consts.TAAS_DST_RESPOND)self.tun_br.delete_flows(table=taas_ovs_consts.TAAS_SRC_RESPOND)3.初始化tap網橋的OpenFlow流表
self.tap_br.add_flow(table=0,priority=1,in_port=patch_tap_int_id,actions="resubmit(,%s)" %taas_ovs_consts.TAAS_RECV_LOC)self.tap_br.add_flow(table=0,priority=1,in_port=patch_tap_tun_id,actions="resubmit(,%s)" %taas_ovs_consts.TAAS_RECV_REM)self.tap_br.add_flow(table=0,priority=0,actions="drop")self.tap_br.add_flow(table=taas_ovs_consts.TAAS_RECV_LOC,priority=0,actions="output:%s" % str(patch_tap_tun_id))self.tap_br.add_flow(table=taas_ovs_consts.TAAS_RECV_REM,priority=0,actions="drop")這段代碼可能看起來比較吃力,我在下面做出一些解釋:
里面出現了 taas_ovs_consts.TAAS_RECV_LOC 與?taas_ovs_consts.TAAS_RECV_REM兩張表。結合后面的代碼仔細一看,TAAS_RECV_LOC負責處理從br-int過來的流量(流出vm的流量),默認為轉發到br-tun;TAAS_RECV_REM負責處理從br-tun過來的流量,默認為不讓任何來自于br-tun的流量通過。來自其他網橋的流量也默認不通過。
4.初始化br-tun網橋的OpenFlow流表
self.tun_br.add_flow(table=0,priority=1,in_port=patch_tun_tap_id,actions="resubmit(,%s)" %taas_ovs_consts.TAAS_SEND_UCAST)self.tun_br.add_flow(table=taas_ovs_consts.TAAS_SEND_UCAST,priority=0,actions="resubmit(,%s)" %taas_ovs_consts.TAAS_SEND_FLOOD)flow_action = self._create_tunnel_flood_flow_action()if flow_action != "":self.tun_br.add_flow(table=taas_ovs_consts.TAAS_SEND_FLOOD,priority=0,actions=flow_action)self.tun_br.add_flow(table=taas_ovs_consts.TAAS_CLASSIFY,priority=2,reg0=0,actions="resubmit(,%s)" %taas_ovs_consts.TAAS_DST_CHECK)self.tun_br.add_flow(table=taas_ovs_consts.TAAS_CLASSIFY,priority=1,reg0=1,actions="resubmit(,%s)" %taas_ovs_consts.TAAS_DST_CHECK)self.tun_br.add_flow(table=taas_ovs_consts.TAAS_CLASSIFY,priority=1,reg0=2,actions="resubmit(,%s)" %taas_ovs_consts.TAAS_SRC_CHECK)self.tun_br.add_flow(table=taas_ovs_consts.TAAS_DST_CHECK,priority=0,actions="drop")self.tun_br.add_flow(table=taas_ovs_consts.TAAS_SRC_CHECK,priority=0,actions="drop")self.tun_br.add_flow(table=taas_ovs_consts.TAAS_DST_RESPOND,priority=2,reg0=0,actions="output:%s" % str(patch_tun_tap_id))self.tun_br.add_flow(table=taas_ovs_consts.TAAS_DST_RESPOND,priority=1,reg0=1,actions=("output:%s,""move:NXM_OF_VLAN_TCI[0..11]->NXM_NX_TUN_ID""[0..11],mod_vlan_vid:2,output:in_port" %str(patch_tun_tap_id)))self.tun_br.add_flow(table=taas_ovs_consts.TAAS_SRC_RESPOND,priority=1,actions=("learn(table=%s,hard_timeout=60,""priority=1,NXM_OF_VLAN_TCI[0..11],""load:NXM_OF_VLAN_TCI[0..11]->NXM_NX_TUN_ID""[0..11],load:0->NXM_OF_VLAN_TCI[0..11],""output:NXM_OF_IN_PORT[])" %taas_ovs_consts.TAAS_SEND_UCAST))return以上就是給計算節點開機后,taas插件的初始化過程。關于在實際使用時流表的配置,以及為什么要這么配置,我后期會專門寫一篇文章來講解。
總結
以上是生活随笔為你收集整理的OpenStack 云平台流量监控插件tap-as-a-service(Taas)代码解析(二):的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Win7系统的笔记本u盘数据如何恢复
- 下一篇: 《炬丰科技-半导体工艺》湿法刻蚀硅片表面