<div>I am starting work on a blueprint [1] to create a public API for</div><div>subscribing to the message bus in a more generic way than we have for</div><div>RPC calls right now. We're already doing this in Ceilometer, and the</div>
<div>Quantum team needs it, too. Unfortunately, the way I ended up doing it</div><div>in Ceilometer was with a private method of the Connection.</div><div><br></div><div>I've created a wiki page describing the plan [2], but I wanted to</div>
<div>summarize it here on the list and get some more feedback before I</div><div>begin implementation.</div><div><br></div><div>Ceilometer and Quantum both have some unusual requirements that make</div><div>the existing API inadequate:</div>
<div><br></div><div>1. We want to listen for notifications, not RPC calls.</div><div><br></div><div>2. We need to listen for notifications from *other projects* not just</div><div> ourselves, so we need to connect to more than one exchange (in AMQP</div>
<div> parlance).</div><div><br></div><div>3. We want to distribute the load among several workers, while</div><div> ensuring that our consumption of notifications does not interrupt</div><div> the flow of those messages to anyone else who also needs them</div>
<div> (since notifications are a general integration mechanism).</div><div><br></div><div>4. While we currently want to listen for notifications, I don't want</div><div> to tie the design to notification-formatted messages so that it</div>
<div> doesn't work for other non-RPC messages.</div><div><br></div><div>My current plann is to create a new method of Connection:</div><div><br></div><div>def join_consumer_pool(self, callback, pool_name, topic, exchange_name=cfg.CONF.control_exchange):</div>
<div> """Register as a member of a group of consumers for a</div><div> given topic from the specified routing exchange.</div><div><br></div><div> Exactly one member of a given pool will receive each message.</div>
<div><br></div><div> A message will be delivered to multiple pools, if more than one is created.</div><div> """</div><div><br></div><div>- The callback argument is a callable object to be invoked with a</div>
<div> single argument, the incoming message (with the JSON unpacked).</div><div><br></div><div>- In an AMQP implementation the pool_name argument will be used to</div><div> create or join a queue. Workers in the same pool will use the same</div>
<div> queue, ensuring that messages can be delivered to more than one</div><div> load-balanced application.</div><div><br></div><div>- The topic argument is the usual routing value (i.e., for</div><div> notifications it would be "<a href="http://notifications.info">notifications.info</a>").</div>
<div><br></div><div>- The exchange_name should default to the exchange for the current app</div><div> but passing a value explicitly also allows an application to</div><div> communicate with more than one exchange.</div>
<div><br></div><div>ZMQ-based configurations do not currently have the concept of an</div><div>exchange, but after discussing it with Eric we think it would be</div><div>useful to add it and pass the value to the matchmaker as a way to</div>
<div>filter the returned clients. I'm not sure yet if we want to use the</div><div>term "exchange" or come up with something more generic.</div><div><br></div><div>There are more details about some of the implementation details in the</div>
<div>wiki [2], if you're interested in going deeper.</div><div><br></div><div>I will start working on the code next week, so please let me know if</div><div>you have any concerns with the plan, names, etc.</div><div><br>
</div><div>Thanks,</div><div>Doug</div><div><br></div><div><br></div><div><br></div><div>[1] <a href="http://wiki.openstack.org/Ceilometer/blueprints/move-listener-framework-oslo">http://wiki.openstack.org/Ceilometer/blueprints/move-listener-framework-oslo</a></div>
<div>[2] <a href="https://blueprints.launchpad.net/ceilometer/+spec/move-listener-framework-oslo">https://blueprints.launchpad.net/ceilometer/+spec/move-listener-framework-oslo</a></div><div><br></div>