蓝月亮四肖中特

about云開發

 找回密碼
 立即注冊

QQ登錄

只需一步,快速開始

打印 上一主題 下一主題

Ceilometer項目源碼分析----ceilometer-agent-notification服務的初始化和啟動

[復制鏈接]
跳轉到指定樓層
樓主
坎蒂絲_Swan 發表于 2014-12-14 19:06:41 | 只看該作者 回帖獎勵 |倒序瀏覽 |閱讀模式
本帖最后由 坎蒂絲_Swan 于 2014-12-14 19:45 編輯

問題導讀
問題1:服務ceilometer-agent-notification的初始化操作實現了哪些操作?
問題2:服務ceilometer-agent-notification的啟動操作實現了哪些任務?








ceilometer-agent-notification服務的初始化和啟動

    本篇帖子將解析服務組件ceilometer-agent-compute的初始化和啟動操作。ceilometer-agent-notification服務組件實現訪問oslo-messaging,openstack中各個模塊都會推送通知(notification)信息到oslo-messaging消息框架,ceilometer-agent-notification通過訪問這個消息隊列服務框架,獲取相關通知信息,并進一步轉化為采樣數據的格式。從消息隊列服務框架獲取通知信息,并進一步獲取采樣數據信息,可以理解為被動獲取監控數據操作,需要一直監聽oslo-messaging消息隊列。

    來看方法/ceilometer/cli.py----def agent_notification,這個方法即實現了ceilometer-agent-notification服務的初始化和啟動操作。

  1. def agent_notification():  
  2.     service.prepare_service()  
  3.     launcher = os_service.ProcessLauncher()  
  4.     launcher.launch_service(  
  5.         notification.NotificationService(cfg.CONF.host,'ceilometer.agent.notification'),  
  6.         # workers默認值為1;  
  7.         workers=service.get_workers('notification'))  
  8.     launcher.wait()  
復制代碼

1 服務ceilometer-agent-notification的初始化操作
服務ceilometer-agent-notification的初始化操作主要實現了以下內容的操作:

(1)若干參數的初始化,定義了所要監聽序列的host和topic;

(2)建立線程池,用于后續服務中若干操作的運行;

class Service(service.Service)----def __init__

  1. class Service(service.Service):  
  2.     def __init__(self, host, topic, manager=None, serializer=None):  
  3.         """
  4.         NotificationService(cfg.CONF.host,'ceilometer.agent.notification')
  5.         host:cfg.CONF.host
  6.         topic:'ceilometer.agent.notification'
  7.         """  
  8.         super(Service, self).__init__()  
  9.         self.host = host  
  10.         self.topic = topic  
  11.         self.serializer = serializer  
  12.         if manager is None:  
  13.             self.manager = self  
  14.         else:  
  15.             self.manager = manager  
復制代碼

class Service(object)----def __init__
  1. class Service(object):  
  2.     def __init__(self, threads=1000):  
  3.         self.tg = threadgroup.ThreadGroup(threads)  
  4.         self._done = event.Event()  
復制代碼


2 服務ceilometer-agent-notification的啟動操作

服務ceilometer-agent-notification的啟動操作實現了以下任務:

(1)加載命名空間'ceilometer.dispatcher'中的插件;


(2)為RPC通信建立到信息總線的連接,建立指定類型的消息消費者;

(3)啟動協程實現啟動啟動消費者線程,等待并消費處理隊列'ceilometer.agent.notification'中的消息;

(4)連接到消息總線來獲取通知信息;實際上就是實現監聽oslo-messaging消息框架中compute/image/network/heat/cinder等服務的隊列;

(5)從隊列中獲取通知信息,將通知轉換程采樣數據的格式,然后進行采樣數據的發布操作;從通知獲取采樣數據信息,可以理解為被動獲取數據操作;

class NotificationService----def start

  1. class NotificationService(service.DispatchedService, rpc_service.Service):  
  2.     NOTIFICATION_NAMESPACE = 'ceilometer.notification'  
  3.     def start(self):  
  4.         super(NotificationService, self).start()  
  5.         # Add a dummy thread to have wait() working  
  6.         self.tg.add_timer(604800, lambda: None)  
復制代碼


class DispatchedService----def start

加載命名空間'ceilometer.dispatcher'中的插件:
ceilometer.dispatcher =
    database = ceilometer.dispatcher.database:DatabaseDispatcher
    file = ceilometer.dispatcher.file:FileDispatcher


  1. class DispatchedService(object):  
  2.     DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'  
  3.     def start(self):  
  4.         """
  5.         加載命名空間'ceilometer.dispatcher'中的插件:
  6.         ceilometer.dispatcher =
  7.         database = ceilometer.dispatcher.database:DatabaseDispatcher
  8.         file = ceilometer.dispatcher.file:FileDispatcher
  9.         """  
  10.         super(DispatchedService, self).start()  
  11.         LOG.debug(_('loading dispatchers from %s'),  
  12.                   self.DISPATCHER_NAMESPACE)  
  13.          
  14.         self.dispatcher_manager = named.NamedExtensionManager(  
  15.             # self.DISPATCHER_NAMESPACE = ceilometer.dispatcher  
  16.             namespace=self.DISPATCHER_NAMESPACE,  
  17.             # cfg.CONF.dispatcher = ['database']  
  18.             names=cfg.CONF.dispatcher,  
  19.             invoke_on_load=True,  
  20.             invoke_args=[cfg.CONF])  
  21.         if not list(self.dispatcher_manager):  
  22.             LOG.warning(_('Failed to load any dispatchers for %s'),  
  23.                         self.DISPATCHER_NAMESPACE)  
復制代碼

class Service(service.Service)----def start

這個方法主要完成了以下步驟的內容操作:

(1)為RPC通信建立到信息總線的連接,建立指定類型的消息消費者;

(2)啟動協程實現啟動啟動消費者線程,等待并消費處理隊列'ceilometer.agent.notification'中的消息;

(3)連接到消息總線來獲取通知信息;實際上就是實現監聽oslo-messaging消息框架中compute/image/network/heat/cinder等服務的隊列;

(4)從隊列中獲取通知信息,將通知轉換程采樣數據的格式,然后進行采樣數據的發布操作;從通知獲取采樣數據信息,可以理解為被動獲取數據操作;

注:第(3)(4)步驟是通過執行方法initialize_service_hook實現的;


  1. class Service(service.Service):  
  2.     def start(self):  
  3.         """
  4.         為RPC通信建立到信息總線的連接;
  5.         1.建立指定類型的消息消費者;        
  6.         2.執行方法initialize_service_hook;
  7.         3.啟動協程實現等待并消費處理隊列中的消息;
  8.         """  
  9.         super(Service, self).start()  
  10.   
  11.         """
  12.         為RPC通信建立到信息總線的連接;
  13.         建立一個新的連接,或者從連接池中獲取一個;
  14.         """  
  15.         self.conn = rpc.create_connection(new=True)  
  16.         LOG.debug(_("Creating Consumer connection for Service %s") %  
  17.                   self.topic)  
  18.   
  19.         """
  20.         RpcDispatcher:RPC消息調度類;
  21.         """  
  22.         dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],  
  23.                                                   self.serializer)  
  24.   
  25.         # Share this same connection for these Consumers  
  26.         """
  27.         create_consumer:建立指定類型的消息消費者(fanout or topic);
  28.         1.創建以服務的topic為路由鍵的消費者;
  29.         2.創建以服務的topic和本機名為路由鍵的消費者
  30.           (基于topic&host,可用來接收定向消息);
  31.         3.fanout直接投遞消息,不進行匹配,速度最快
  32.           (fanout類型,可用于接收廣播消息);
  33.         """  
  34.         self.conn.create_consumer(self.topic, dispatcher, fanout=False)  
  35.         node_topic = '%s.%s' % (self.topic, self.host)  
  36.         self.conn.create_consumer(node_topic, dispatcher, fanout=False)  
  37.         self.conn.create_consumer(self.topic, dispatcher, fanout=True)  
  38.   
  39.         # Hook to allow the manager to do other initializations after  
  40.         # the rpc connection is created.  
  41.         """
  42.         在消息消費進程啟動前,必須先聲明消費者;
  43.         建立一個'topic'類型的消息消費者;
  44.         根據消費者類(TopicConsumer)和消息隊列名稱
  45.         (pool_name:  ceilometer.collector.metering)
  46.         以及指定主題topic(metering)建立消息消費者,并加入消費者列表;
  47.         """  
  48.         if callable(getattr(self.manager, 'initialize_service_hook', None)):  
  49.             self.manager.initialize_service_hook(self)  
  50.   
  51.         """
  52.         啟動消費者線程;
  53.         consume_in_thread用evelent.spawn創建一個協程一直運行;
  54.         等待消息,在有消費到來時會創建新的協程運行遠程調用的函數;
  55.         啟動協程實現等待并消費處理隊列中的消息;
  56.         """  
  57.         self.conn.consume_in_thread()  
復制代碼


下面來重點分析方法class NotificationService----def initialize_service_hook,這個方法主要實現以下步驟的內容操作:

1.獲取與命名空間ceilometer.event.trait_plugin相匹配的所有插件,并加載;
  ceilometer.event.trait_plugin =
      split = ceilometer.event.trait_plugins:SplitterTraitPlugin
      bitfield = ceilometer.event.trait_plugins:BitfieldTraitPlugin

2.獲取與命名空間ceilometer.notification相匹配的所有插件,并加載;
  ceilometer.notification =
      instance = ceilometer.compute.notifications.instance:Instance
      ......
      stack_crud = ceilometer.orchestration.notifications:StackCRUD
  這些插件描述了針對各個監控項,如何從對應的通知中獲取相關監控信息并形成采樣格式;

3.連接到消息總線來獲取通知信息;
  實際上就是實現監聽oslo-messaging消息框架中compute/image/network/heat/cinder等服務的隊列;
  從隊列中獲取通知信息,將通知轉換程采樣數據的格式,然后進行采樣數據的發布操作;

class NotificationService----def initialize_service_hook

  1. class NotificationService(service.DispatchedService, rpc_service.Service):  
  2.     NOTIFICATION_NAMESPACE = 'ceilometer.notification'  
  3.     def initialize_service_hook(self, service):  
  4.         '''''
  5.         主要實現的功能:
  6.         1.加載命名空間'ceilometer.notification'的所有插件:
  7.         2.遍歷上述加載的所有插件,均執行方法:_setup_subscription
  8.         注:_setup_subscription:連接到消息總線來獲取通知信息;
  9.         實際上就是實現監聽oslo-messaging消息框架中compute/image/network/heat/cinder等服務的隊列;
  10.         從隊列中獲取通知信息,將通知轉換程采樣數據的格式,然后進行采樣數據的發布操作;
  11.         從通知獲取采樣數據信息,可以理解為被動獲取數據操作;
  12.         '''  
  13.         self.pipeline_manager = pipeline.setup_pipeline()  
  14.   
  15.         LOG.debug(_('Loading event definitions'))  
  16.          
  17.         """
  18.         extension.ExtensionManager:
  19.         獲取與namespace(ceilometer.event.trait_plugin)相匹配的所有插件,并加載;
  20.         ceilometer.event.trait_plugin =
  21.         split = ceilometer.event.trait_plugins:SplitterTraitPlugin
  22.         bitfield = ceilometer.event.trait_plugins:BitfieldTraitPlugin
  23.         """  
  24.         self.event_converter = event_converter.setup_events(  
  25.             extension.ExtensionManager(  
  26.                 namespace='ceilometer.event.trait_plugin'))  
  27.   
  28.         """
  29.         NOTIFICATION_NAMESPACE = 'ceilometer.notification'
  30.         加載命名空間'ceilometer.notification'的插件:
  31.         """  
  32.         self.notification_manager = \  
  33.             extension.ExtensionManager(  
  34.                 namespace=self.NOTIFICATION_NAMESPACE,  
  35.                 invoke_on_load=True,  
  36.             )  
  37.   
  38.         if not list(self.notification_manager):  
  39.             LOG.warning(_('Failed to load any notification handlers for %s'),  
  40.                         self.NOTIFICATION_NAMESPACE)  
  41.          
  42.         """
  43.         連接到消息總線來獲取通知信息;
  44.         實際上就是實現監聽oslo-messaging消息框架中compute/image/network/heat/cinder等服務的隊列;
  45.         從隊列中獲取通知信息,將通知轉換程采樣數據的格式,然后進行采樣數據的發布操作;
  46.         從通知獲取采樣數據信息,可以理解為被動獲取數據操作;
  47.          
  48.         遍歷上述加載的所有插件,均執行方法:
  49.         def _setup_subscription(ext, *args, **kwds)
  50.         其中ext即為遍歷的上述加載的插件;
  51.         """  
  52.         self.notification_manager.map(self._setup_subscription)
復制代碼
上述代碼主要實現了三部分的內容,下面來進行細致的分析;


步驟1:

  1. self.event_converter = event_converter.setup_events(  
  2.     extension.ExtensionManager(  
  3.         namespace='ceilometer.event.trait_plugin'))  
復制代碼

這里主要實現獲取與命名空間ceilometer.event.trait_plugin相匹配的所有插件,并加載;
ceilometer.event.trait_plugin =
        split = ceilometer.event.trait_plugins:SplitterTraitPlugin
        bitfield = ceilometer.event.trait_plugins:BitfieldTraitPlugin

步驟2:

  1. self.notification_manager = \  
  2.     extension.ExtensionManager(  
  3.         namespace=self.NOTIFICATION_NAMESPACE,  
  4.         invoke_on_load=True,  
  5.     )  
復制代碼

這里主要實現獲取與命名空間ceilometer.notification相匹配的所有插件,并加載;
ceilometer.notification =
        instance = ceilometer.compute.notifications.instance:Instance
        instance_flavor = ceilometer.compute.notifications.instance:InstanceFlavor
        instance_delete = ceilometer.compute.notifications.instance:InstanceDelete
        instance_scheduled = ceilometer.compute.notifications.instance:InstanceScheduled
        memory = ceilometer.compute.notifications.instance:Memory
        vcpus = ceilometer.compute.notifications.instance:VCpus
        disk_root_size = ceilometer.compute.notifications.instance:RootDiskSize
        disk_ephemeral_size = ceilometer.compute.notifications.instance:EphemeralDiskSize
        cpu_frequency = ceilometer.compute.notifications.cpu:CpuFrequency
        cpu_user_time = ceilometer.compute.notifications.cpu:CpuUserTime
        cpu_kernel_time = ceilometer.compute.notifications.cpu:CpuKernelTime
        cpu_idle_time = ceilometer.compute.notifications.cpu:CpuIdleTime
        cpu_iowait_time = ceilometer.compute.notifications.cpu:CpuIowaitTime
        cpu_kernel_percent = ceilometer.compute.notifications.cpu:CpuKernelPercent
        cpu_idle_percent = ceilometer.compute.notifications.cpu:CpuIdlePercent
        cpu_user_percent = ceilometer.compute.notifications.cpu:CpuUserPercent
        cpu_iowait_percent = ceilometer.compute.notifications.cpu:CpuIowaitPercent
        cpu_percent = ceilometer.compute.notifications.cpu:CpuPercent
        volume = ceilometer.volume.notifications:Volume
        volume_size = ceilometer.volume.notifications:VolumeSize
        image_crud = ceilometer.image.notifications:ImageCRUD
        image = ceilometer.image.notifications:Image
        image_size = ceilometer.image.notifications:ImageSize
        image_download = ceilometer.image.notifications:ImageDownload
        image_serve = ceilometer.image.notifications:ImageServe
        network = ceilometer.network.notifications:Network
        subnet = ceilometer.network.notifications:Subnet
        port = ceilometer.network.notifications:Port
        router = ceilometer.network.notifications:Router
        floatingip = ceilometer.network.notifications:FloatingIP
        bandwidth = ceilometer.network.notifications:Bandwidth
        http.request = ceilometer.middleware:HTTPRequest
        http.response = ceilometer.middleware:HTTPResponse
        stack_crud = ceilometer.orchestration.notifications:StackCRUD
這些插件描述了針對各個監控項,如何從對應的通知中獲取相關監控信息并形成采樣格式;

步驟3:

  1. self.notification_manager.map(self._setup_subscription)  
復制代碼


這里所實現的功能是:
連接到消息總線來獲取通知信息;實際上就是實現監聽oslo-messaging消息框架中compute/image/network/heat/cinder等服務的隊列;從隊列中獲取通知信息,將通知轉換程采樣數據的格式,然后進行采樣數據的發布操作;

這條語句的執行操作是遍歷命名空間ceilometer.notification的所有插件,均執行方法:

def _setup_subscription(ext, *args, **kwds)

方法_setup_subscription解析:

方法_setup_subscription所實現的功能:

針對上述加載的命名空間ceilometer.notification中的一個插件,執行以下操作:

1.調用方法get_exchange_topics獲取插件的ExchangeTopics序列;
class ComputeNotificationBase----def get_exchange_topics;
class ImageBase----def get_exchange_topics;
class NetworkNotificationBase----def get_exchange_topics;
class StackCRUD----def get_exchange_topics;
class _Base(卷)----def get_exchange_topics;
ExchangeTopics序列描述了用于連接到所監聽隊列的交換器exchange和topics;
經過分析所獲取的exchange和topics的值為:
topics = 'notifications.info'指定所要監聽的消息隊列;
exchange = nova/glance/neutron/heat/cinder來區分獲取不同服務的通知信息;

2.遍歷所監聽的消息隊列(暫時只有一個隊列notifications.info),實現:

2.1.建立一個'topic'類型的消息消費者;

2.2.監聽topic指定的消息隊列(notifications.info),當進行消息消費操作的時候,將層層調用,最終實現調用方法self.process_notification,實現將接收到的通知轉換成采樣數據的格式,并進行發布;

(1)根據不同監控項調用不同插件中的process_notification方法,實現從通知中獲取監控項的采樣數據信息;

(2)實現發布監控項采樣數據樣本(File/RPC/UDP);

來看方法_setup_subscription的源碼:

  1. def _setup_subscription(self, ext, *args, **kwds):  
  2.     """        
  3.     針對上述加載的命名空間ceilometer.notification中的一個插件,執行以下操作:
  4.     1.調用方法get_exchange_topics獲取插件的ExchangeTopics序列;
  5.     class ComputeNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
  6.     class ImageBase(plugin.NotificationBase)----def get_exchange_topics;
  7.     class NetworkNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
  8.     class StackCRUD(plugin.NotificationBase)----def get_exchange_topics;
  9.     class _Base(plugin.NotificationBase)(卷)----def get_exchange_topics;
  10.     ExchangeTopics序列描述了用于連接到所監聽隊列的交換器exchange和topics;
  11.     經過分析所獲取的exchange和topics的值為:
  12.     topics = 'notifications.info'指定所要監聽的消息隊列;
  13.     exchange = nova/glance/neutron/heat/cinder來區分獲取不同服務的通知信息;
  14.     2.遍歷所監聽的消息隊列(暫時只有一個隊列notifications.info),實現:
  15.     2<span style="font-family:KaiTi_GB2312;">.</span>1.建立一個'topic'類型的消息消費者;
  16.     2.2.監聽topic指定的消息隊列(notifications.info),當進行消息消費操作的時候,將層層調用,
  17.     最終實現調用方法self.process_notification,實現將接收到的通知轉換成采樣數據的格式,并進行發布;
  18.     (1).根據不同監控項和具體插件調用不同的process_notification方法,實現從通知中獲取監控項的采樣數據信息;
  19.     (2).實現發布監控項采樣數據樣本(File/RPC/UDP);
  20.     """  
  21.     handler = ext.obj  
  22.       
  23.     """
  24.     default = True
  25.     """  
  26.     ack_on_error = cfg.CONF.notification.ack_on_event_error  
  27.     LOG.debug(_('Event types from %(name)s: %(type)s'  
  28.                 ' (ack_on_error=%(error)s)') %  
  29.               {'name': ext.name,  
  30.                'type': ', '.join(handler.event_types),  
  31.                'error': ack_on_error})  
  32.   
  33.     """
  34.     調用方法get_exchange_topics獲取插件的ExchangeTopics序列;
  35.     class ComputeNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
  36.     class ImageBase(plugin.NotificationBase)----def get_exchange_topics;
  37.     class NetworkNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
  38.     class StackCRUD(plugin.NotificationBase)----def get_exchange_topics;
  39.     class _Base(plugin.NotificationBase)(卷)----def get_exchange_topics;
  40.     ExchangeTopics序列描述了用于連接到所監聽隊列的交換器exchange和topics;
  41.     經過分析所獲取的exchange和topics的值為:
  42.     topics = 'notifications.info'指定所要監聽的消息隊列;
  43.     exchange = nova/glance/neutron/heat/cinder來區分獲取不同服務的通知信息;
  44.     """  
  45.     for exchange_topic in handler.get_exchange_topics(cfg.CONF):  
  46.         """
  47.         遍歷所監聽的消息隊列(暫時只有一個隊列notifications.info);
  48.         """  
  49.         for topic in exchange_topic.topics:  
  50.             try:           
  51.                 """
  52.                 實現封裝方法callback,即self.process_notification;
  53.                 1.建立一個'topic'類型的消息消費者;
  54.                 2.監聽topic指定的消息隊列(notifications.info),當進行消息消費操作的時候,將層層調用,
  55.                   最終實現調用方法callback_wrapper,即self.process_notification;
  56.                  
  57.                 callback=self.process_notification
  58.                 將接收到的通知轉換成采樣數據的格式,并進行發布;
  59.                 1.根據不同監控項和具體插件調用不同的process_notification方法,
  60.                   實現從通知中獲取監控項的采樣數據信息;
  61.                 2.實現發布監控項采樣數據樣本(File/RPC/UDP);
  62.                 """  
  63.                 self.conn.join_consumer_pool(  
  64.                     # process_notification:將接收到的通知轉換成采樣數據的格式,并進行發布;  
  65.                     callback=self.process_notification,  
  66.                     pool_name=topic,  
  67.                     topic=topic,  
  68.                     exchange_name=exchange_topic.exchange,  
  69.                     ack_on_error=ack_on_error)  
  70.             except Exception:  
  71.                 LOG.exception(_('Could not join consumer pool'  
  72.                                 ' %(topic)s/%(exchange)s') %  
  73.                               {'topic': topic,  
  74.                                'exchange': exchange_topic.exchange})  
復制代碼

接著來看這里所調用的方法process_notification:
  1. def process_notification(self, notification):  
  2.     """
  3.     RPC endpoint for notification messages
  4.     將接收到的通知轉換成采樣數據的格式,并進行發布;
  5.     1.根據不同監控項和具體插件調用不同的process_notification方法,
  6.       實現從通知中獲取監控項的采樣數據信息;
  7.     2.實現發布監控項采樣數據樣本(File/RPC/UDP);

  8.     When another service sends a notification over the message
  9.     bus, this method receives it. See _setup_subscription().
  10.     """  
  11.     LOG.debug(_('notification %r'), notification.get('event_type'))  
  12.       
  13.     """
  14.     _process_notification_for_ext:
  15.     將接收到的通知轉換成采樣數據的格式,并進行發布;
  16.     1.根據不同監控項和具體插件調用不同的process_notification方法,
  17.       實現從通知中獲取監控項的采樣數據信息;
  18.     2.實現發布監控項采樣數據樣本(File/RPC/UDP);
  19.     """  
  20.     self.notification_manager.map(self._process_notification_for_ext,  
  21.                                   notification=notification)  
  22.   
  23.     # cfg.CONF.notification.store_events:默認值為False;  
  24.     if cfg.CONF.notification.store_events:  
  25.         # 轉換通知消息到Ceilometer Event;  
  26.         self._message_to_event(notification)  
復制代碼

再來看方法_process_notification_for_ext:
  1. def _process_notification_for_ext(self, ext, notification):  
  2.     """
  3.     將接收到的通知轉換成采樣數據的格式,并進行發布;
  4.     1.根據不同監控項和具體插件調用不同的process_notification方法,
  5.       實現從通知中獲取監控項的采樣數據信息;
  6.     2.實現發布監控項采樣數據樣本(File/RPC/UDP);
  7.      
  8.     to_samples:
  9.     根據不同監控項和具體插件調用以下process_notification方法,實現從通知中獲取監控項的采樣數據信息;
  10.     class ComputeMetricsNotificationBase----def process_notification(self, message)
  11.     class UserMetadataAwareInstanceNotificationBase----def process_notification(self, message)
  12.     class ImageCRUD(ImageCRUDBase)----def process_notification(self, message)
  13.     class Image(ImageCRUDBase)----def process_notification(self, message)
  14.     class ImageSize(ImageCRUDBase)----def process_notification(self, message)
  15.     class ImageDownload(ImageBase)----def process_notification(self, message)
  16.     class ImageServe(ImageBase)----def process_notification(self, message)
  17.     class NetworkNotificationBase(plugin.NotificationBase)----def process_notification(self, message)
  18.     class Bandwidth(NetworkNotificationBase)----def process_notification(self, message)
  19.     class StackCRUD(plugin.NotificationBase)----def process_notification(self, message)
  20.     class Volume(_Base)----def process_notification(self, message)
  21.     class VolumeSize(_Base)----def process_notification(self, message)
  22.     class HTTPRequest(plugin.NotificationBase)----def process_notification(self, message)
  23.     class NotificationService(service.DispatchedService, rpc_service.Service)----def process_notification(self, notification)
  24.      
  25.     publisher:
  26.     實現發布監控項采樣數據樣本;
  27.     1.class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
  28.     實現發布采樣數據到一個日志文件;
  29.     2.class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
  30.     通過RPC發布采樣數據;
  31.     * 從若干采樣數據信息samples中獲取提取數據形成信息格式meters,為信息的發布或存儲做準備;
  32.     * 將之前從采樣數據中提取的信息meters包裝成msg;
  33.     * 將匹配的topic,msg添加到本地隊列local_queue中,topic默認為metering;
  34.     * 實現發布本地隊列local_queue中的所有數據信息到隊列metering中;
  35.     * 其中,消息msg中封裝的'method'方法為'record_metering_data',即當消息被消費時,將會
  36.       執行方法record_metering_data,實現存儲到數據存儲系統中(數據庫);
  37.     3.class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)
  38.     通過UDP發布采樣數據;
  39.      
  40.     to_samples:
  41.     根據不同監控項和具體插件調用以下process_notification方法,
  42.     實現從通知中獲取監控項的采樣數據信息;
  43.     """  
  44.     with self.pipeline_manager.publisher(context.get_admin_context()) as p:  
  45.         # FIXME(dhellmann): Spawn green thread?  
  46.         p(list(ext.obj.to_samples(notification)))  
復制代碼

再來看方法to_samples:
  1. def to_samples(self, notification):  
  2.         """
  3.         根據不同監控項和具體插件調用以下process_notification方法,
  4.         實現從通知中獲取監控項的采樣數據信息;
  5.         class ComputeMetricsNotificationBase----def process_notification(self, message)
  6.         class UserMetadataAwareInstanceNotificationBase----def process_notification(self, message)
  7.         class ImageCRUD(ImageCRUDBase)----def process_notification(self, message)
  8.         class Image(ImageCRUDBase)----def process_notification(self, message)
  9.         class ImageSize(ImageCRUDBase)----def process_notification(self, message)
  10.         class ImageDownload(ImageBase)----def process_notification(self, message)
  11.         class ImageServe(ImageBase)----def process_notification(self, message)
  12.         class NetworkNotificationBase(plugin.NotificationBase)----def process_notification(self, message)
  13.         class Bandwidth(NetworkNotificationBase)----def process_notification(self, message)
  14.         class StackCRUD(plugin.NotificationBase)----def process_notification(self, message)
  15.         class Volume(_Base)----def process_notification(self, message)
  16.         class VolumeSize(_Base)----def process_notification(self, message)
  17.         class HTTPRequest(plugin.NotificationBase)----def process_notification(self, message)
  18.         class NotificationService(service.DispatchedService, rpc_service.Service)----def process_notification(self, notification)
  19.         """  
  20.         if self._handle_event_type(notification['event_type'],  
  21.                                    self.event_types):  
  22.             return self.process_notification(notification)  
  23.         return []  
復制代碼


同樣,這里實現監控信息發布操作可選取三種模式RPC/UDP/FILE:

模式1:實現發布采樣數據到一個日志文件

  1. class FilePublisher(publisher.PublisherBase):   
  2.     def publish_samples(self, context, samples):   
  3.         if self.publisher_logger:   
  4.             for sample in samples:   
  5.                 self.publisher_logger.info(sample.as_dict())   
復制代碼

模式2:通過RPC發布采樣數據(具體見代碼注釋)
  1. class RPCPublisher(publisher.PublisherBase):   
  2.     def publish_samples(self, context, samples):   
  3.         """  
  4.         通過RPC發布信息;  
  5.         1.從若干采樣數據信息samples中獲取提取數據形成信息格式meters,為信息的發布或存儲做準備;  
  6.         2.將之前從采樣數據中提取的信息meters包裝成msg;  
  7.         3.將匹配的topic,msg添加到本地隊列local_queue中,topic默認為metering;  
  8.         4.實現發布本地隊列local_queue中的所有數據信息到隊列metering中;  
  9.         5.其中,消息msg中封裝的'method'方法為'record_metering_data',即當消息被消費時,將會  
  10.           執行方法record_metering_data,實現存儲到數據存儲系統中(數據庫);  
  11.         """   
  12.         
  13.         # 從若干采樣數據信息中獲取提取數據形成信息格式,為信息的發布或存儲做準備;   
  14.         meters = [   
  15.             # meter_message_from_counter:   
  16.             # 為一個監控采樣數據做好準備被發布或存儲;   
  17.             # 從一個采樣數據信息中獲取提取信息形成msg;   
  18.             utils.meter_message_from_counter(   
  19.                 sample,   
  20.                 cfg.CONF.publisher.metering_secret)   
  21.             for sample in samples   
  22.         ]   
  23.         
  24.         # cfg.CONF.publisher_rpc.metering_topic:metering messages所使用的主題,默認為metering;   
  25.         topic = cfg.CONF.publisher_rpc.metering_topic   
  26.                
  27.         # 將之前從采樣數據中提取的信息meters包裝成msg;   
  28.         msg = {   
  29.             'method': self.target,   
  30.             'version': '1.0',   
  31.             'args': {'data': meters},   
  32.         }   
  33.                
  34.         # 將匹配的topic,msg添加到本地隊列local_queue中,topic默認為metering;   
  35.         self.local_queue.append((context, topic, msg))   
  36.         
  37.         if self.per_meter_topic:   
  38.             for meter_name, meter_list in itertools.groupby(   
  39.                     sorted(meters, key=operator.itemgetter('counter_name')),   
  40.                     operator.itemgetter('counter_name')):   
  41.                 msg = {   
  42.                     'method': self.target,   
  43.                     'version': '1.0',   
  44.                     'args': {'data': list(meter_list)},   
  45.                 }   
  46.                 topic_name = topic + '.' + meter_name   
  47.                 LOG.audit(_('Publishing %(m)d samples on %(n)s') % (   
  48.                           {'m': len(msg['args']['data']), 'n': topic_name}))   
  49.                 self.local_queue.append((context, topic_name, msg))   
  50.         
  51.         # 實現發布本地隊列local_queue中的所有數據信息;   
  52.         self.flush()  
復制代碼
  1. def flush(self):   
  2.     """  
  3.     實現發布本地隊列中的所有數據信息;  
  4.     """            
  5.     # 獲取本地隊列的數據信息;   
  6.     queue = self.local_queue   
  7.     self.local_queue = []   
  8.                
  9.     # 實現循環發布隊列queue中的信息數據;   
  10.     self.local_queue = self._process_queue(queue, self.policy) + \self.local_queue   
  11.                
  12.     if self.policy == 'queue':   
  13.         self._check_queue_length()
復制代碼
  1. @staticmethod   
  2. def _process_queue(queue, policy):   
  3.     """  
  4.     實現循環發布隊列queue中的信息數據;  
  5.     """   
  6.     while queue:   
  7.         # 取出第一位的topic、msg等數據;   
  8.         context, topic, msg = queue[0]   
  9.         try:   
  10.             # 實現遠程發布信息,不返回任何值;   
  11.             rpc.cast(context, topic, msg)   
  12.         except (SystemExit, rpc.common.RPCException):   
  13.             samples = sum([len(m['args']['data']) for n, n, m in queue])   
  14.             if policy == 'queue':   
  15.                 LOG.warn(_("Failed to publish %d samples, queue them"),samples)   
  16.                 return queue   
  17.             elif policy == 'drop':   
  18.                 LOG.warn(_("Failed to publish %d samples, dropping them"),samples)   
  19.                 return []   
  20.             # default, occur only if rabbit_max_retries > 0   
  21.             raise   
  22.         else:   
  23.             # 從隊列中刪除發布后的信息;   
  24.             queue.pop(0)   
  25.     return []  
復制代碼

模式3:通過UDP發布采樣數據(具體見代碼注釋)
  1. class UDPPublisher(publisher.PublisherBase):   
  2.     def publish_samples(self, context, samples):   
  3.         """  
  4.         通過UDP協議發送meter信息到服務器端,實現監控信息的發布;  
  5.         """   
  6.         for sample in samples:   
  7.             """  
  8.             為一個監控采樣數據做好準備被發布或存儲;  
  9.             從一個采樣數據信息中獲取提取信息形成msg;  
  10.             """   
  11.             msg = utils.meter_message_from_counter(   
  12.                 sample,   
  13.                 cfg.CONF.publisher.metering_secret)   
  14.             host = self.host   
  15.             port = self.port   
  16.             LOG.debug(_("Publishing sample %(msg)s over UDP to "   
  17.                         "%(host)s:%(port)d") % {'msg': msg, 'host': host,'port': port})   
  18.             """  
  19.             通過UDP協議發送meter信息到服務器端,實現監控信息的發布;  
  20.             """   
  21.             try:   
  22.                 self.socket.sendto(msgpack.dumps(msg),(self.host, self.port))   
  23.             except Exception as e:   
  24.                 LOG.warn(_("Unable to send sample over UDP"))   
  25.                 LOG.exception(e)  
復制代碼






Ceilometer項目源碼分析----ceilometer項目源碼結構分析
Ceilometer項目源碼分析----ceilometer報警器服務的實現概覽
Ceilometer項目源碼分析----ceilometer報警器狀態評估方式
Ceilometer項目源碼分析----ceilometer分布式報警系統的具體實現
Ceilometer項目源碼分析----ceilometer-alarm-notifier服務的初始化和啟動
Ceilometer項目源碼分析----ceilometer-alarm-evaluator服務的初始化和啟動
Ceilometer項目源碼分析----ceilometer-agent-central服務的初始化和啟動
Ceilometer項目源碼分析----ceilometer-agent-compute服務的初始化和啟動
Ceilometer項目源碼分析----ceilometer-agent-notification服務的初始化和啟動
Ceilometer項目源碼分析----ceilometer-collector服務的初始化和啟動







歡迎加入about云群90371779322273151432264021 ,云計算愛好者群,亦可關注about云騰訊認證空間||關注本站微信
您需要登錄后才可以回帖 登錄 | 立即注冊

本版積分規則

關閉

推薦上一條 /4 下一條

QQ|小黑屋|about云開發-學問論壇|社區 ( 京ICP備12023829號 )

GMT+8, 2020-2-20 07:46 , Processed in 1.078125 second(s), 23 queries , Gzip On.

Powered by Discuz! X3.4 Licensed

© 2018 Comsenz Inc.Designed by u179

快速回復 返回頂部 返回列表
蓝月亮四肖中特 黑龙江11选5奖金 快乐12开奖结果查询辽宁省 贵州11选5牛人彩民 足球网站 威尼斯崛起赚钱论坛 什么是股票 棒球比分直播美式足球 宁夏11选5怎么中奖 在手机买彩票网站是哪个 大丰彩票网址