Graceful stopping of RabbitMQ AMQP notification listener

Anil Jangam anilj.mailing at gmail.com
Wed Sep 2 09:47:27 UTC 2020


Hi,

I found the issue and it was the way I was starting and stopping the
listener. I got some more logs below that helped me understand this
deadlock behavior. I have fixed issue in my app.

OPS_WORKER_0--DEBUG-2020-09-02 02:39:30,309-amqpdriver.py-322 -
AMQPListener connection consume
OPS_WORKER_0--DEBUG-2020-09-02 02:39:30,310-connection.py-712 -
heartbeat_tick : for connection b616c1205a61466a94e4ae2e79e6ba84
OPS_WORKER_0--DEBUG-2020-09-02 02:39:30,310-connection.py-734 -
heartbeat_tick : Prev sent/recv: 8/8, now - 8/8, monotonic - 32.014996083,
last_heartbeat_sent - 0.97555832, heartbeat int. - 60 for connection
b616c1205a61466a94e4ae2e79e6ba84
MainProcess--WARNING-2020-09-02 02:39:34,108-server.py-127 - Possible hang:
stop is waiting for start to complete
MainProcess--DEBUG-2020-09-02 02:39:34,117-server.py-128 -
  File "/testprogs/python/oslo_notif/main.py", line 33, in <module> main()
  File "/testprogs/python/oslo_notif/main.py", line 29, in main
worker.stop()
  File "/testprogs/python/oslo_notif/oslo_worker.py", line 85, in stop
self.__amqp_handler.stop_amqp_event_listener()
  File "/testprogs/python/oslo_notif/oslo_notif_handler.py", line 184, in
stop_amqp_event_listener self.__amqp_listener.stop()
  File "/pyvenv37/lib/python3.7/site-packages/oslo_messaging/server.py",
line 264, in wrapper log_after, timeout_timer)
  File "/pyvenv37/lib/python3.7/site-packages/oslo_messaging/server.py",
line 163, in wait_for_completion msg, log_after, timeout_timer)
  File "/pyvenv37/lib/python3.7/site-packages/oslo_messaging/server.py",
line 128, in _wait

/anil.


On Wed, Sep 2, 2020 at 12:12 AM Anil Jangam <anilj.mailing at gmail.com> wrote:

> Hi,
>
> I have coded OpenStack AMQP listener following the example and it is
> working fine.
>
> https://github.com/gibizer/nova-notification-demo/blob/master/ws_forwarder.py
>
> The related code snippets of the NotificationHandler class are shown as
> follows.
>
> # Initialize the AMQP listener
> def init(self, cluster_ip, user, password, port):
>     cfg.CONF()
>     cluster_url = "rabbit://" + user + ":" + password + "@" + cluster_ip + ":" + port + "/"
>     transport = oslo_messaging.get_notification_transport(cfg.CONF, url=cluster_url)
>     targets = [
>         oslo_messaging.Target(topic='versioned_notifications'),
>     ]
>     endpoints = [self.__endpoint]
>
>     # Initialize the notification listener
>     try:
>         self.__amqp_listener = oslo_messaging.get_notification_listener(transport,
>                                                                         targets,
>                                                                         endpoints,
>                                                                         executor='threading')
>     except NotImplementedError as err:
>         LOGGER.error("Failed to initialize the notification listener {}".format(err))
>         return False
>
>     LOGGER.debug("Initialized the notification listener {}".format(cluster_url))
>     return True
>
> # Arm the compute event listeners
> def start_amqp_event_listener(self):
>     # Start the notification handler
>     LOGGER.debug("Started the OpenStack notification handler")
>     self.__amqp_listener.start()
>
> # Disarm the compute event listeners
> def stop_amqp_event_listener(self):
>     LOGGER.debug("Stopping the OpenStack notification handler")
>     if self.__amqp_listener is not None:
>         self.__amqp_listener.stop()
>
> I am using this interface from a new process handler function, however,
> when I invoke the stop_amqp_eent_listener() method, my process hangs. It
> does not terminate naturally.
> I verified that the self.__amqp_listener.stop() function is not
> returning. Is there anything missing in this code? Is there any specific
> consideration when calling the listener from a new process?
>
> Can someone provide a clue?
>
> # Stop the worker
> def stop(self):
>     # Stop the AMQP notification handler
>     self.__amqp_handler.stop_amqp_event_listener()
>     LOGGER.debug("Stopped the worker for {}".format(self.__ops_conn_info.cluster_ip))
>
>
> /anil.
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.openstack.org/pipermail/openstack-discuss/attachments/20200902/6a003bd0/attachment.html>


More information about the openstack-discuss mailing list