I have coded OpenStack AMQP listener following the example and it is working fine.
The related code snippets of the NotificationHandler class are shown as follows.
# Initialize the AMQP listener
def init(self, cluster_ip, user, password, port):
cfg.CONF()
cluster_url = "rabbit://" + user + ":" + password + "@" + cluster_ip + ":" + port + "/"
transport = oslo_messaging.get_notification_transport(cfg.CONF, url=cluster_url)
targets = [
oslo_messaging.Target(topic='versioned_notifications'),
]
endpoints = [self.__endpoint]
# Initialize the notification listener
try:
self.__amqp_listener = oslo_messaging.get_notification_listener(transport,
targets,
endpoints,
executor='threading')
except NotImplementedError as err:
LOGGER.error("Failed to initialize the notification listener {}".format(err))
return False
LOGGER.debug("Initialized the notification listener {}".format(cluster_url))
return True
# Arm the compute event listeners
def start_amqp_event_listener(self):
# Start the notification handler
LOGGER.debug("Started the OpenStack notification handler")
self.__amqp_listener.start()
# Disarm the compute event listeners
def stop_amqp_event_listener(self):
LOGGER.debug("Stopping the OpenStack notification handler")
if self.__amqp_listener is not None:
self.__amqp_listener.stop()
I am using this interface from a new process handler function, however, when I invoke the stop_amqp_eent_listener() method, my process hangs. It does not terminate naturally.
I verified that the self.__amqp_listener.stop() function is not returning. Is there anything missing in this code? Is there any specific consideration when calling the listener from a new process?
Can someone provide a clue?
/anil.