Graceful stopping of RabbitMQ AMQP notification listener

Anil Jangam anilj.mailing at gmail.com
Wed Sep 2 07:12:33 UTC 2020


Hi,

I have coded OpenStack AMQP listener following the example and it is
working fine.
https://github.com/gibizer/nova-notification-demo/blob/master/ws_forwarder.py

The related code snippets of the NotificationHandler class are shown as
follows.

# Initialize the AMQP listener
def init(self, cluster_ip, user, password, port):
    cfg.CONF()
    cluster_url = "rabbit://" + user + ":" + password + "@" +
cluster_ip + ":" + port + "/"
    transport = oslo_messaging.get_notification_transport(cfg.CONF,
url=cluster_url)
    targets = [
        oslo_messaging.Target(topic='versioned_notifications'),
    ]
    endpoints = [self.__endpoint]

    # Initialize the notification listener
    try:
        self.__amqp_listener =
oslo_messaging.get_notification_listener(transport,
                                                                        targets,

 endpoints,

 executor='threading')
    except NotImplementedError as err:
        LOGGER.error("Failed to initialize the notification listener
{}".format(err))
        return False

    LOGGER.debug("Initialized the notification listener {}".format(cluster_url))
    return True

# Arm the compute event listeners
def start_amqp_event_listener(self):
    # Start the notification handler
    LOGGER.debug("Started the OpenStack notification handler")
    self.__amqp_listener.start()

# Disarm the compute event listeners
def stop_amqp_event_listener(self):
    LOGGER.debug("Stopping the OpenStack notification handler")
    if self.__amqp_listener is not None:
        self.__amqp_listener.stop()

I am using this interface from a new process handler function, however,
when I invoke the stop_amqp_eent_listener() method, my process hangs. It
does not terminate naturally.
I verified that the self.__amqp_listener.stop() function is not returning.
Is there anything missing in this code? Is there any specific consideration
when calling the listener from a new process?

Can someone provide a clue?

# Stop the worker
def stop(self):
    # Stop the AMQP notification handler
    self.__amqp_handler.stop_amqp_event_listener()
    LOGGER.debug("Stopped the worker for
{}".format(self.__ops_conn_info.cluster_ip))


/anil.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.openstack.org/pipermail/openstack-discuss/attachments/20200902/b72ae94a/attachment-0001.html>


More information about the openstack-discuss mailing list