Graceful stopping of RabbitMQ AMQP notification listener
Hi, I have coded OpenStack AMQP listener following the example and it is working fine. https://github.com/gibizer/nova-notification-demo/blob/master/ws_forwarder.p... The related code snippets of the NotificationHandler class are shown as follows. # Initialize the AMQP listener def init(self, cluster_ip, user, password, port): cfg.CONF() cluster_url = "rabbit://" + user + ":" + password + "@" + cluster_ip + ":" + port + "/" transport = oslo_messaging.get_notification_transport(cfg.CONF, url=cluster_url) targets = [ oslo_messaging.Target(topic='versioned_notifications'), ] endpoints = [self.__endpoint] # Initialize the notification listener try: self.__amqp_listener = oslo_messaging.get_notification_listener(transport, targets, endpoints, executor='threading') except NotImplementedError as err: LOGGER.error("Failed to initialize the notification listener {}".format(err)) return False LOGGER.debug("Initialized the notification listener {}".format(cluster_url)) return True # Arm the compute event listeners def start_amqp_event_listener(self): # Start the notification handler LOGGER.debug("Started the OpenStack notification handler") self.__amqp_listener.start() # Disarm the compute event listeners def stop_amqp_event_listener(self): LOGGER.debug("Stopping the OpenStack notification handler") if self.__amqp_listener is not None: self.__amqp_listener.stop() I am using this interface from a new process handler function, however, when I invoke the stop_amqp_eent_listener() method, my process hangs. It does not terminate naturally. I verified that the self.__amqp_listener.stop() function is not returning. Is there anything missing in this code? Is there any specific consideration when calling the listener from a new process? Can someone provide a clue? # Stop the worker def stop(self): # Stop the AMQP notification handler self.__amqp_handler.stop_amqp_event_listener() LOGGER.debug("Stopped the worker for {}".format(self.__ops_conn_info.cluster_ip)) /anil.
Hi, I found the issue and it was the way I was starting and stopping the listener. I got some more logs below that helped me understand this deadlock behavior. I have fixed issue in my app. OPS_WORKER_0--DEBUG-2020-09-02 02:39:30,309-amqpdriver.py-322 - AMQPListener connection consume OPS_WORKER_0--DEBUG-2020-09-02 02:39:30,310-connection.py-712 - heartbeat_tick : for connection b616c1205a61466a94e4ae2e79e6ba84 OPS_WORKER_0--DEBUG-2020-09-02 02:39:30,310-connection.py-734 - heartbeat_tick : Prev sent/recv: 8/8, now - 8/8, monotonic - 32.014996083, last_heartbeat_sent - 0.97555832, heartbeat int. - 60 for connection b616c1205a61466a94e4ae2e79e6ba84 MainProcess--WARNING-2020-09-02 02:39:34,108-server.py-127 - Possible hang: stop is waiting for start to complete MainProcess--DEBUG-2020-09-02 02:39:34,117-server.py-128 - File "/testprogs/python/oslo_notif/main.py", line 33, in <module> main() File "/testprogs/python/oslo_notif/main.py", line 29, in main worker.stop() File "/testprogs/python/oslo_notif/oslo_worker.py", line 85, in stop self.__amqp_handler.stop_amqp_event_listener() File "/testprogs/python/oslo_notif/oslo_notif_handler.py", line 184, in stop_amqp_event_listener self.__amqp_listener.stop() File "/pyvenv37/lib/python3.7/site-packages/oslo_messaging/server.py", line 264, in wrapper log_after, timeout_timer) File "/pyvenv37/lib/python3.7/site-packages/oslo_messaging/server.py", line 163, in wait_for_completion msg, log_after, timeout_timer) File "/pyvenv37/lib/python3.7/site-packages/oslo_messaging/server.py", line 128, in _wait /anil. On Wed, Sep 2, 2020 at 12:12 AM Anil Jangam <anilj.mailing@gmail.com> wrote:
Hi,
I have coded OpenStack AMQP listener following the example and it is working fine.
https://github.com/gibizer/nova-notification-demo/blob/master/ws_forwarder.p...
The related code snippets of the NotificationHandler class are shown as follows.
# Initialize the AMQP listener def init(self, cluster_ip, user, password, port): cfg.CONF() cluster_url = "rabbit://" + user + ":" + password + "@" + cluster_ip + ":" + port + "/" transport = oslo_messaging.get_notification_transport(cfg.CONF, url=cluster_url) targets = [ oslo_messaging.Target(topic='versioned_notifications'), ] endpoints = [self.__endpoint]
# Initialize the notification listener try: self.__amqp_listener = oslo_messaging.get_notification_listener(transport, targets, endpoints, executor='threading') except NotImplementedError as err: LOGGER.error("Failed to initialize the notification listener {}".format(err)) return False
LOGGER.debug("Initialized the notification listener {}".format(cluster_url)) return True
# Arm the compute event listeners def start_amqp_event_listener(self): # Start the notification handler LOGGER.debug("Started the OpenStack notification handler") self.__amqp_listener.start()
# Disarm the compute event listeners def stop_amqp_event_listener(self): LOGGER.debug("Stopping the OpenStack notification handler") if self.__amqp_listener is not None: self.__amqp_listener.stop()
I am using this interface from a new process handler function, however, when I invoke the stop_amqp_eent_listener() method, my process hangs. It does not terminate naturally. I verified that the self.__amqp_listener.stop() function is not returning. Is there anything missing in this code? Is there any specific consideration when calling the listener from a new process?
Can someone provide a clue?
# Stop the worker def stop(self): # Stop the AMQP notification handler self.__amqp_handler.stop_amqp_event_listener() LOGGER.debug("Stopped the worker for {}".format(self.__ops_conn_info.cluster_ip))
/anil.
participants (1)
-
Anil Jangam