[openstack-dev] [TaskFlow] TaskFlow persistence

pnkk pnkk2016 at gmail.com
Sat Mar 19 15:45:18 UTC 2016


Hi Joshua,

Thanks for all your inputs.
We are using this feature successfully. But I rarely see an issue related
to concurrency.

To give you a brief, we use eventlets and every job runs in a separate
eventlet thread.

In the job execution part, we use taskflow functionality and persist all
the details of the job.

There we try to get the backend as below. We do upgrade everytime when a
job is executed.

        backend_uri = cfg.CONF.db.sqlalchemy_url
        conf = {
            'connection': backend_uri,
        }
        self.backend = backends.fetch(conf)
        with contextlib.closing(self.backend.get_connection()) as conn:
            conn.upgrade()

Now when two jobs start executing at the same time, I see below error.
Failed upgrading database version
  DBAPIError: (exceptions.RuntimeError) reentrant call inside
<_io.BufferedReader name=14>

We have monkey patched eventlet, is it not supposed to take care of these
concurrency issues?

Below are the versions for related modules in use:
eventlet==0.17.4
taskflow==1.26.0
SQLAlchemy==1.0.9

Thanks,
Kanthi

On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow <harlowja at fastmail.com>
wrote:

> pn kk wrote:
>
>> Hi Joshua,
>>
>> Yes, sure will do that once I get some window out of my work.
>>
>> One last query(hopefully :) ) , can the factory method be an instance
>> method of a class?
>>
>
> Instance methods are particularly hard to use (since they require an
> instance of an object to be useful); so I think the check u have hit is
> making sure that if the flow-factory is called to recreate the flow that it
> can do so without having import issues. Currently I believe it doesn't
> handle instance methods (due to the complexity of needing an instance
> attached to that method).
>
> Perhaps what u want is to provide a function that can be found like:
>
> def make_flow_factory():
>    return FlowFactory().flow_factory
>
> Or make flow_factory a class or static method, which should have the
> same/similar effect.
>
> Hope that makes sense (more queries are fine and welcome!)
>
>
>> I tried giving it as "FlowFactory().flow_factory", where FlowFactory is
>> my class name. It failed with below error:
>> ValueError: Flow factory <bound method FlowFactory.flow_factory of
>> <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable by
>> name __builtin__.instance.flow_factory
>>
>> Thanks again
>> -Kanthi
>>
>>
>> On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow <harlowja at fastmail.com
>> <mailto:harlowja at fastmail.com>> wrote:
>>
>>     pn kk wrote:
>>
>>         Hi,
>>
>>         Thanks for the responses. Putting it in a small example
>>
>>         def flow_factory(tmp):
>>               return lf.Flow('resume from backend example').add(
>>                   TestTask(name='first', test=tmp),
>>                   InterruptTask(name='boom'),
>>                   TestTask(name='second', test="second task"))
>>
>>
>>         class TestTask(task.Task):
>>               def __init__(self, name, provides=None, test, **kwargs):
>>                   self.test=test
>>                   super(TestTask, self).__init__(name, provides, **kwargs)
>>               def execute(self, *args, **kwargs):
>>                   print('executing %s' % self)
>>                   return 'ok'
>>
>>         class InterruptTask(task.Task):
>>               def execute(self, *args, **kwargs):
>>                   # DO NOT TRY THIS AT HOME
>>                   engine.suspend()
>>
>>         I was searching for a way in which I can reload the flow after
>> crash
>>         without passing the parameter "tmp" shown above
>>         Looks like "load_from_factory" gives this provision.
>>
>>
>>     Thanks for the example, ya, this is one such way to do this that u
>>     have found, there are a few other ways, but that is the main one
>>     that people should be using.
>>
>>
>>
>>         engine =
>>         taskflow.engines.load_from_factory(flow_factory=flow_factory,
>>         factory_kwargs={"tmp":"test_data"}, book=book, backend=backend)
>>         engine.run()
>>
>>         Now it suspends after running interrupt task, I can now reload
>>         the flow
>>         from the saved factory method without passing parameter again.
>>         for flow_detail_2 in book:
>>               engine2 = taskflow.engines.load_from_detail(flow_detail_2,
>>         backend=backend)
>>
>>         engine2.run()
>>
>>         Let me know if this is ok or is there a better approach to
>>         achieve this?
>>
>>
>>     There are a few other ways, but this one should be the currently
>>     recommended one.
>>
>>     An idea, do u want to maybe update (submit a review to update) the
>>     docs, if u didn't find this very easy to figure out so that others
>>     can more easily figure it out. I'm sure that would be appreciated by
>>     all.
>>
>>
>>         -Thanks
>>
>>
>>         On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow
>>         <harlowja at fastmail.com <mailto:harlowja at fastmail.com>
>>         <mailto:harlowja at fastmail.com <mailto:harlowja at fastmail.com>>>
>>
>>         wrote:
>>
>>              Hi there,
>>
>>              Michał is correct, it should be saved.
>>
>>              Do u have a small example of what u are trying to do
>>         because that
>>              will help determine if what u are doing will be saved or
>>         whether it
>>              will not be.
>>
>>              Or even possibly explaining what is being done would be fine
>> to
>>              (more data/info for me to reason about what should be
>>         stored in your
>>              case).
>>
>>              Thanks,
>>
>>              Josh
>>
>>
>>              Michał Dulko wrote:
>>
>>                  On 01/26/2016 10:23 AM, pn kk wrote:
>>
>>                      Hi,
>>
>>                      I use taskflow for job management and now trying to
>>         persist
>>                      the state
>>                      of flows/tasks in mysql to recover incase of
>>         process crashes.
>>
>>                      I could see the state and the task results stored
>>         in the
>>                      database.
>>
>>                      Now I am looking for some way to store the input
>>         parameters
>>                      of the tasks.
>>
>>                      Please share your inputs to achieve this.
>>
>>                      -Thanks
>>
>>                  I've played with that some time ago and if I recall
>>         correctly input
>>                  parameters should be available in the flow's storage,
>> which
>>                  means these
>>                  are also saved to the DB. Take a look on
>>         resume_workflows method
>>                  on my
>>                  old PoC [1] (hopefully TaskFlow haven't changed much
>>         since then).
>>
>>                  [1]
>>
>> https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py
>>
>>
>>
>> __________________________________________________________________________
>>                  OpenStack Development Mailing List (not for usage
>>         questions)
>>                  Unsubscribe:
>>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>         <
>> http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>         <
>> http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>>
>>
>>
>> __________________________________________________________________________
>>              OpenStack Development Mailing List (not for usage questions)
>>              Unsubscribe:
>>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>         <
>> http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>         <
>> http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>>
>>
>> __________________________________________________________________________
>>         OpenStack Development Mailing List (not for usage questions)
>>         Unsubscribe:
>>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>         <
>> http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>>
>>
>> __________________________________________________________________________
>>     OpenStack Development Mailing List (not for usage questions)
>>     Unsubscribe:
>>     OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>     <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe
>> >
>>     http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>>
>> __________________________________________________________________________
>> OpenStack Development Mailing List (not for usage questions)
>> Unsubscribe:
>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe: OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.openstack.org/pipermail/openstack-dev/attachments/20160319/786922ef/attachment.html>


More information about the OpenStack-dev mailing list