[openstack-dev] Taskflow remote worker model ftw!

Joshua Harlow harlowja at yahoo-inc.com
Mon Mar 10 21:23:38 UTC 2014

Hi all,

I'd just like to let everyone know a new feature in taskflow (that I think will be benefical to various projects (reducing the duplication of similar code in various projects that accomplish the same feature set). The new feature is an ability to run tasks in remote-workers (the task transitions and state persistence is still done in an 'orchestrating' engine). This means that the engine no longer has to run tasks locally or in threads (or greenthreads) but can run tasks on remote machines (anything that can be connected to a MQ via kombu; TBD when this becomes oslo.messaging).

A simple example that might show how this works better for folks that have some time to try it out.


Pre-setup: git clone the taskflow repo and install it (in a venv or elsewhere), install a mq server (rabbitmq for example).


Lets now create two basic tasks (one that says hello and one that says goodbye).

class HelloWorldTask(task.Task):

    default_provides = "hi_happened"

    def execute(self):

        LOG.info('hello world')

        return time.time()

class GoodbyeWorldTask(task.Task):

    default_provides = "goodbye_happened"

    def execute(self, hi_happened):

        LOG.info('goodbye world (hi said on %s)', hi_happened)

        return time.time()

* Notice how the GoodbyeWorldTask requires an input 'hi_happened' (which is produced by the HelloWorldTask).

Now lets create a workflow that combines these two together.

f = linear_flow.Flow("hi-bye")



Notice here that we have specified a linear runtime order (that is hello will be said before goodbye, this is also inherent in the dependency ordering since the goodbye task requires 'hi_happened' to run, and the only way to satisfy that dependency is to run the helloworld task before the goodbye task).

*  If you are wondering what the heck this is (or why it is useful to have these little task and flow classes) check out https://wiki.openstack.org/wiki/TaskFlow#Structure

Now the fun begins!

We need a worker to accept requests to run tasks on so lets create a small function that just does that.

def run_worker():

    worker_conf = dict(MQ_CONF)


        # These are the available tasks that this worker has access to execute.

        'tasks': [





    # Start this up and stop it on ctrl-c

    worker = remote_worker.Worker(**worker_conf)

    runner = threading.Thread(target=worker.run)



    while True:



        except KeyboardInterrupt:





And of course we need a function that will perform the orchestration of the remote (or local tasks), this function starts the whole execution flow by taking the workflow defined above and combining that workflow with an engine that will run the individual tasks (and transfer data between those tasks as needed).

* For those still wondering what an engine is (or what it offers) check out https://wiki.openstack.org/wiki/TaskFlow#Engines and https://wiki.openstack.org/wiki/TaskFlow/Patterns_and_Engines/Persistence#Big_Picture (which hopefully will make it easier to understand why the concept exists in the first place).

def run_engine():

    # Make some remote tasks happen

    f = lf.Flow("test")



    # Create a in-memory storage area where intermediate results will be

    # saved (you can change this to a persistent backend if desired).

    backend = impl_memory.MemoryBackend({})

    _logbook, flowdetail = pu.temporary_flow_detail(backend)

    engine_conf = dict(MQ_CONF)


        # This identifies what workers are accessible via what queues, this

        # will be made better soon with reviews https://review.openstack.org/#/c/75094/

        # or similar.

        'workers_info': {

            'work': [






    LOG.info("Running workflow %s", f)

    # Now run the engine.

    e = engine.WorkerBasedActionEngine(f, flowdetail, backend, engine_conf)

    with logging_listener.LoggingListener(e, level=logging.INFO):


    # See the results recieved.

    print("Final results: %s" % (e.storage.fetch_all()))

Now once we have this two methods created we can actually start the worker and the engine and watch the action happen. To do this without having to apply a little more boilerplate (imports and such) the code above can be found at http://paste.openstack.org/show/73071/.

To start a worker just do the following. Download the above paste to a file named 'test.py' (and then modify the MQ_SERVER global to point to your MQ host) and then run the following.

$ python test.py worker # starts a worker

$ python test.py # starts the main engine

You should start to see state transitions happening and the final engine result being produced by the engine coordinating calls with remote-workers.

* For the engine the output should be similar to http://paste.openstack.org/show/73072/, for the worker it should be similar to http://paste.openstack.org/show/73073/

Hopefully this new model can be useful in the future to heat, glance, ... and any others that would like to take advantage of said functionality (taskflow is a library on pypi that anyone can and is encouraged to use).

Feel free to ask questions, concerns, or any other comments welcome (here or in #openstack-state-management).


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.openstack.org/pipermail/openstack-dev/attachments/20140310/b2c7d50b/attachment.html>

More information about the OpenStack-dev mailing list