<div dir="ltr"><div style="font-family:arial,sans-serif;font-size:13.333333969116211px">Icehouse</div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px">oslo-messaging 1.3.0<div>rabbitmq-server 3.1.3</div>
<div><br></div><div>We've noticed that nova rpc calls fail often after rabbit restarts.  I've tracked it down to oslo/rabbit/kombu timing out if it's forced to reconnect to rabbit.  The code below times out waiting for a reply if the topic has been used in a previous run.  The reply always arrives the first time a topic is used, or if the topic is none.  But, the second run with the same topic will hang with this error:</div>
<div><br></div></div><blockquote style="font-family:arial,sans-serif;font-size:13.333333969116211px;margin:0px 0px 0px 40px;border:none;padding:0px">MessagingTimeout: Timed out waiting for a reply to message ID ...</blockquote>
<div style="font-family:arial,sans-serif;font-size:13.333333969116211px"><br></div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px">This problem seems too basic to not be caught earlier in oslo, but the program below does really reproduce the same symptoms we see in nova when run against a live rabbit server.  What's wrong with this picture?</div>
<div style="font-family:arial,sans-serif;font-size:13.333333969116211px"><br></div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px"><br></div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px">
Cheers</div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px">--</div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px">Noel</div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px">
<br></div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px"><br></div><div style="font-family:arial,sans-serif;font-size:13.333333969116211px"><div>#! /usr/bin/python</div><div><br></div><div>from oslo.config import cfg</div>
<div>import threading</div><div>from oslo import messaging</div><div>import logging</div><div>import time</div><div>log = logging.getLogger(__name__)</div><div><br></div><div>class OsloTest():</div><div>    def test(self):</div>
<div>        # The code below times out waiting for a reply if the topic</div><div>        # has been used in a previous run.  The reply always arrives</div><div>        # the first time a topic is used, or if the topic is none.</div>
<div>        # But, the second run with the same topic will hang with this</div><div>        # error:</div><div>        #</div><div>        # MessagingTimeout: Timed out waiting for a reply to message ID ...</div><div>        #</div>
<div>        topic  = 'will_hang_on_second_usage'</div><div>        #topic  = None # never hangs</div><div><br></div><div>        url = "%(proto)s://%(user)s:%(password)s@%(host)s/" % dict(</div><div>            proto = 'rabbit',</div>
<div>            host = '1.2.3.4',</div><div>            password = 'xxxxxxxx',</div><div>            user = 'rabbit-mq-user',</div><div>            )</div><div>        transport = messaging.get_transport(cfg.CONF, url)</div>
<div>        driver = transport._driver</div><div><br></div><div>        target = messaging.Target(topic=topic)</div><div>        listener = driver.listen(target)</div><div>        ctxt={"context": True}</div><div>
        timeout = 10</div><div><br></div><div>        def send_main():</div><div>            log.debug("sending msg")</div><div>            reply = driver.send(target,</div><div>                                ctxt,</div>
<div>                                {'send': 1},</div><div>                                wait_for_reply=True,</div><div>                                timeout=timeout)</div><div><br></div><div>            # times out if topic was not None and used before</div>
<div>            log.debug("received reply=%r" % (reply,))</div><div><br></div><div>        send_thread = threading.Thread(target=send_main)</div><div>        send_thread.daemon = True</div><div>        send_thread.start()</div>
<div><br></div><div>        msg = listener.poll()</div><div>        log.debug("received msg=%r" % (msg,))</div><div><br></div><div>        msg.reply({'reply': 1})</div><div><br></div><div>        log.debug("sent reply")</div>
<div><br></div><div>        send_thread.join()</div><div><br></div><div>if __name__ == '__main__':</div><div>    FORMAT = '%(asctime)-15s %(process)5d %(thread)5d %(filename)s %(funcName)s %(message)s'</div>
<div>    logging.basicConfig(level=logging.DEBUG, format=FORMAT)</div><div>    OsloTest().test()</div></div></div>