[openstack-dev] [TaskFlow] TaskFlow persistence
Lingxian Kong
anlin.kong at gmail.com
Sun Mar 20 08:04:41 UTC 2016
Kanthi, sorry for chiming in, I suggest you may have a chance to take
a look at Mistral[1], which is the workflow as a service in
OpenStack(or without OpenStack).
[1]: http://docs.openstack.org/developer/mistral/
On Sun, Mar 20, 2016 at 5:35 AM, pnkk <pnkk2016 at gmail.com> wrote:
> Filed it at https://bugs.launchpad.net/taskflow/+bug/1559496
>
> Thanks,
> Kanthi
>
> On Sat, Mar 19, 2016 at 9:30 PM, Joshua Harlow <harlowja at fastmail.com>
> wrote:
>>
>> Interesting error, that could be a bug and perhaps we should ensure
>> upgrade is more thread-safe (with a lock on upgrade); can u open a bug @
>> bugs.launchpad.net/taskflow for that and we can try to add said lock (that
>> should hopefully resolve what u are seeing, although if it doesn't then the
>> bug lies somewhere else).
>>
>> Thanks much!
>>
>> -Josh
>>
>>
>> On 03/19/2016 08:45 AM, pnkk wrote:
>>>
>>> Hi Joshua,
>>>
>>> Thanks for all your inputs.
>>> We are using this feature successfully. But I rarely see an issue
>>> related to concurrency.
>>>
>>> To give you a brief, we use eventlets and every job runs in a separate
>>> eventlet thread.
>>>
>>> In the job execution part, we use taskflow functionality and persist all
>>> the details of the job.
>>>
>>> There we try to get the backend as below. We do upgrade everytime when a
>>> job is executed.
>>>
>>> backend_uri = cfg.CONF.db.sqlalchemy_url
>>> conf = {
>>> 'connection': backend_uri,
>>> }
>>> self.backend = backends.fetch(conf)
>>> with contextlib.closing(self.backend.get_connection()) as conn:
>>> conn.upgrade()
>>>
>>> Now when two jobs start executing at the same time, I see below error.
>>> Failed upgrading database version
>>> DBAPIError: (exceptions.RuntimeError) reentrant call inside
>>> <_io.BufferedReader name=14>
>>>
>>> We have monkey patched eventlet, is it not supposed to take care of
>>> these concurrency issues?
>>>
>>> Below are the versions for related modules in use:
>>> eventlet==0.17.4
>>> taskflow==1.26.0
>>> SQLAlchemy==1.0.9
>>>
>>> Thanks,
>>> Kanthi
>>>
>>> On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow <harlowja at fastmail.com
>>> <mailto:harlowja at fastmail.com>> wrote:
>>>
>>> pn kk wrote:
>>>
>>> Hi Joshua,
>>>
>>> Yes, sure will do that once I get some window out of my work.
>>>
>>> One last query(hopefully :) ) , can the factory method be an
>>> instance
>>> method of a class?
>>>
>>>
>>> Instance methods are particularly hard to use (since they require an
>>> instance of an object to be useful); so I think the check u have hit
>>> is making sure that if the flow-factory is called to recreate the
>>> flow that it can do so without having import issues. Currently I
>>> believe it doesn't handle instance methods (due to the complexity of
>>> needing an instance attached to that method).
>>>
>>> Perhaps what u want is to provide a function that can be found like:
>>>
>>> def make_flow_factory():
>>> return FlowFactory().flow_factory
>>>
>>> Or make flow_factory a class or static method, which should have the
>>> same/similar effect.
>>>
>>> Hope that makes sense (more queries are fine and welcome!)
>>>
>>>
>>> I tried giving it as "FlowFactory().flow_factory", where
>>> FlowFactory is
>>> my class name. It failed with below error:
>>> ValueError: Flow factory <bound method FlowFactory.flow_factory
>>> of
>>> <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable
>>> by
>>> name __builtin__.instance.flow_factory
>>>
>>> Thanks again
>>> -Kanthi
>>>
>>>
>>> On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow
>>> <harlowja at fastmail.com <mailto:harlowja at fastmail.com>
>>> <mailto:harlowja at fastmail.com <mailto:harlowja at fastmail.com>>>
>>> wrote:
>>>
>>> pn kk wrote:
>>>
>>> Hi,
>>>
>>> Thanks for the responses. Putting it in a small example
>>>
>>> def flow_factory(tmp):
>>> return lf.Flow('resume from backend example').add(
>>> TestTask(name='first', test=tmp),
>>> InterruptTask(name='boom'),
>>> TestTask(name='second', test="second task"))
>>>
>>>
>>> class TestTask(task.Task):
>>> def __init__(self, name, provides=None, test,
>>> **kwargs):
>>> self.test=test
>>> super(TestTask, self).__init__(name,
>>> provides, **kwargs)
>>> def execute(self, *args, **kwargs):
>>> print('executing %s' % self)
>>> return 'ok'
>>>
>>> class InterruptTask(task.Task):
>>> def execute(self, *args, **kwargs):
>>> # DO NOT TRY THIS AT HOME
>>> engine.suspend()
>>>
>>> I was searching for a way in which I can reload the
>>> flow after crash
>>> without passing the parameter "tmp" shown above
>>> Looks like "load_from_factory" gives this provision.
>>>
>>>
>>> Thanks for the example, ya, this is one such way to do this
>>> that u
>>> have found, there are a few other ways, but that is the
>>> main one
>>> that people should be using.
>>>
>>>
>>>
>>> engine =
>>>
>>> taskflow.engines.load_from_factory(flow_factory=flow_factory,
>>> factory_kwargs={"tmp":"test_data"}, book=book,
>>> backend=backend)
>>> engine.run()
>>>
>>> Now it suspends after running interrupt task, I can now
>>> reload
>>> the flow
>>> from the saved factory method without passing parameter
>>> again.
>>> for flow_detail_2 in book:
>>> engine2 =
>>> taskflow.engines.load_from_detail(flow_detail_2,
>>> backend=backend)
>>>
>>> engine2.run()
>>>
>>> Let me know if this is ok or is there a better approach
>>> to
>>> achieve this?
>>>
>>>
>>> There are a few other ways, but this one should be the
>>> currently
>>> recommended one.
>>>
>>> An idea, do u want to maybe update (submit a review to
>>> update) the
>>> docs, if u didn't find this very easy to figure out so that
>>> others
>>> can more easily figure it out. I'm sure that would be
>>> appreciated by
>>> all.
>>>
>>>
>>> -Thanks
>>>
>>>
>>> On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow
>>> <harlowja at fastmail.com <mailto:harlowja at fastmail.com>
>>> <mailto:harlowja at fastmail.com <mailto:harlowja at fastmail.com>>
>>> <mailto:harlowja at fastmail.com
>>> <mailto:harlowja at fastmail.com> <mailto:harlowja at fastmail.com
>>> <mailto:harlowja at fastmail.com>>>>
>>>
>>> wrote:
>>>
>>> Hi there,
>>>
>>> Michał is correct, it should be saved.
>>>
>>> Do u have a small example of what u are trying to
>>> do
>>> because that
>>> will help determine if what u are doing will be
>>> saved or
>>> whether it
>>> will not be.
>>>
>>> Or even possibly explaining what is being done
>>> would be fine to
>>> (more data/info for me to reason about what should
>>> be
>>> stored in your
>>> case).
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>>
>>> Michał Dulko wrote:
>>>
>>> On 01/26/2016 10:23 AM, pn kk wrote:
>>>
>>> Hi,
>>>
>>> I use taskflow for job management and now
>>> trying to
>>> persist
>>> the state
>>> of flows/tasks in mysql to recover incase
>>> of
>>> process crashes.
>>>
>>> I could see the state and the task results
>>> stored
>>> in the
>>> database.
>>>
>>> Now I am looking for some way to store the
>>> input
>>> parameters
>>> of the tasks.
>>>
>>> Please share your inputs to achieve this.
>>>
>>> -Thanks
>>>
>>> I've played with that some time ago and if I
>>> recall
>>> correctly input
>>> parameters should be available in the flow's
>>> storage, which
>>> means these
>>> are also saved to the DB. Take a look on
>>> resume_workflows method
>>> on my
>>> old PoC [1] (hopefully TaskFlow haven't
>>> changed much
>>> since then).
>>>
>>> [1]
>>>
>>> https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py
>>>
>>>
>>>
>>>
>>> __________________________________________________________________________
>>> OpenStack Development Mailing List (not for
>>> usage
>>> questions)
>>> Unsubscribe:
>>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>>
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>>
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>>
>>>
>>>
>>>
>>>
>>> __________________________________________________________________________
>>> OpenStack Development Mailing List (not for usage
>>> questions)
>>> Unsubscribe:
>>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>>
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>>
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>>
>>>
>>>
>>>
>>> __________________________________________________________________________
>>> OpenStack Development Mailing List (not for usage
>>> questions)
>>> Unsubscribe:
>>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>>
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>>
>>>
>>>
>>>
>>> __________________________________________________________________________
>>> OpenStack Development Mailing List (not for usage questions)
>>> Unsubscribe:
>>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>>
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>>
>>>
>>>
>>> __________________________________________________________________________
>>> OpenStack Development Mailing List (not for usage questions)
>>> Unsubscribe:
>>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>>
>>>
>>>
>>> __________________________________________________________________________
>>> OpenStack Development Mailing List (not for usage questions)
>>> Unsubscribe:
>>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>>
>>> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>>
>>>
>>>
>>>
>>>
>>> __________________________________________________________________________
>>> OpenStack Development Mailing List (not for usage questions)
>>> Unsubscribe:
>>> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>>>
>>
>> __________________________________________________________________________
>> OpenStack Development Mailing List (not for usage questions)
>> Unsubscribe: OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe: OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
--
Regards!
-----------------------------------
Lingxian Kong
More information about the OpenStack-dev
mailing list