neutron/common/rpc.py
def init(conf):
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = oslo_messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport(
conf, allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES)
serializer = RequestContextSerializer()
NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer)
- **注意:**给
oslo_messaging.get_notification_transport
传递的参数中url
为 None,则 Notifier 会采用同 rpc(oslo_messaging.message)相同的一样的 url。
我们先看一下 /etc/neutron/neutron.conf 中关于 oslo_messaging.Notifier 的配置:
[oslo_messaging_notifications]
#driver =
#transport_url = <None>
#topics = notifications
这三个选项分别对应着 Notifier 的:
- 驱动(log、messaging....)
- url(应该采用哪种 transport)
- topic(传递消息时用到的 topic)
在 devstack 默认的情况下是没有设置 driver 的,也就是虽然实例化了 Notifier,但是却无法发送消息。
我们举例说在 neutron/api/v2/base.py 中 Contoller
中使用的 Notifer:
self._notifier = n_rpc.get_notifier('network')
def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)
也就是在初始化 Notifier 的基础上更新了 publisher_id
,相当于创建一个新的 Notifier
实例(这样子可以避免驱动的加载)。
self._notifier.info(request.context,
self._resource + '.create.start',
body)
这里就会发送 Notifier 的消息。
- 发送端:
from oslo_config import cfg
import oslo_messaging
transport = oslo_messaging.get_transport(cfg.CONF)
notifier = oslo_messaging.Notifier(transport, driver='messaging', publisher_id='testing', topic='notifications')
notifier.info({'some': 'context'}, 'just.testing', {'heavy': 'payload'})
- 接收端
import json
from oslo_config import cfg
import oslo_messaging
class NotificationEndpoint(object):
def info(self, ctxt, publisher_id, event_type, payload, metadata):
print 'recv notification:'
print json.dumps(payload, indent=4)
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
None
def error(self, ctxt, publisher_id, event_type, payload, metadata):
None
transport = oslo_messaging.get_transport(cfg.CONF)
targets = [ oslo_messaging.Target(topic='notifications') ]
endpoints = [ NotificationEndpoint() ]
server = oslo_messaging.get_notification_listener(transport, targets, endpoints)
server.start()
server.wait()
- 接收到的消息
{
"heavy": "payload"
}
如果你开着 openstack 的话,有可能接收到来自 openstack 组件的消息奥。
Notifier 我翻译过
Notification Driver 我翻译过