[openstack-dev] [TaskFlow] TaskFlow persistence
Joshua Harlow
harlowja at fastmail.com
Sat Mar 19 16:00:44 UTC 2016
Interesting error, that could be a bug and perhaps we should ensure
upgrade is more thread-safe (with a lock on upgrade); can u open a bug @
bugs.launchpad.net/taskflow for that and we can try to add said lock
(that should hopefully resolve what u are seeing, although if it doesn't
then the bug lies somewhere else).
Thanks much!
-Josh
On 03/19/2016 08:45 AM, pnkk wrote:
> Hi Joshua,
>
> Thanks for all your inputs.
> We are using this feature successfully. But I rarely see an issue
> related to concurrency.
>
> To give you a brief, we use eventlets and every job runs in a separate
> eventlet thread.
>
> In the job execution part, we use taskflow functionality and persist all
> the details of the job.
>
> There we try to get the backend as below. We do upgrade everytime when a
> job is executed.
>
> backend_uri = cfg.CONF.db.sqlalchemy_url
> conf = {
> 'connection': backend_uri,
> }
> self.backend = backends.fetch(conf)
> with contextlib.closing(self.backend.get_connection()) as conn:
> conn.upgrade()
>
> Now when two jobs start executing at the same time, I see below error.
> Failed upgrading database version
> DBAPIError: (exceptions.RuntimeError) reentrant call inside
> <_io.BufferedReader name=14>
>
> We have monkey patched eventlet, is it not supposed to take care of
> these concurrency issues?
>
> Below are the versions for related modules in use:
> eventlet==0.17.4
> taskflow==1.26.0
> SQLAlchemy==1.0.9
>
> Thanks,
> Kanthi
>
> On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow <harlowja at fastmail.com
> <mailto:harlowja at fastmail.com>> wrote:
>
> pn kk wrote:
>
> Hi Joshua,
>
> Yes, sure will do that once I get some window out of my work.
>
> One last query(hopefully :) ) , can the factory method be an
> instance
> method of a class?
>
>
> Instance methods are particularly hard to use (since they require an
> instance of an object to be useful); so I think the check u have hit
> is making sure that if the flow-factory is called to recreate the
> flow that it can do so without having import issues. Currently I
> believe it doesn't handle instance methods (due to the complexity of
> needing an instance attached to that method).
>
> Perhaps what u want is to provide a function that can be found like:
>
> def make_flow_factory():
> return FlowFactory().flow_factory
>
> Or make flow_factory a class or static method, which should have the
> same/similar effect.
>
> Hope that makes sense (more queries are fine and welcome!)
>
>
> I tried giving it as "FlowFactory().flow_factory", where
> FlowFactory is
> my class name. It failed with below error:
> ValueError: Flow factory <bound method FlowFactory.flow_factory of
> <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable by
> name __builtin__.instance.flow_factory
>
> Thanks again
> -Kanthi
>
>
> On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow
> <harlowja at fastmail.com <mailto:harlowja at fastmail.com>
> <mailto:harlowja at fastmail.com <mailto:harlowja at fastmail.com>>>
> wrote:
>
> pn kk wrote:
>
> Hi,
>
> Thanks for the responses. Putting it in a small example
>
> def flow_factory(tmp):
> return lf.Flow('resume from backend example').add(
> TestTask(name='first', test=tmp),
> InterruptTask(name='boom'),
> TestTask(name='second', test="second task"))
>
>
> class TestTask(task.Task):
> def __init__(self, name, provides=None, test,
> **kwargs):
> self.test=test
> super(TestTask, self).__init__(name,
> provides, **kwargs)
> def execute(self, *args, **kwargs):
> print('executing %s' % self)
> return 'ok'
>
> class InterruptTask(task.Task):
> def execute(self, *args, **kwargs):
> # DO NOT TRY THIS AT HOME
> engine.suspend()
>
> I was searching for a way in which I can reload the
> flow after crash
> without passing the parameter "tmp" shown above
> Looks like "load_from_factory" gives this provision.
>
>
> Thanks for the example, ya, this is one such way to do this
> that u
> have found, there are a few other ways, but that is the
> main one
> that people should be using.
>
>
>
> engine =
>
> taskflow.engines.load_from_factory(flow_factory=flow_factory,
> factory_kwargs={"tmp":"test_data"}, book=book,
> backend=backend)
> engine.run()
>
> Now it suspends after running interrupt task, I can now
> reload
> the flow
> from the saved factory method without passing parameter
> again.
> for flow_detail_2 in book:
> engine2 =
> taskflow.engines.load_from_detail(flow_detail_2,
> backend=backend)
>
> engine2.run()
>
> Let me know if this is ok or is there a better approach to
> achieve this?
>
>
> There are a few other ways, but this one should be the
> currently
> recommended one.
>
> An idea, do u want to maybe update (submit a review to
> update) the
> docs, if u didn't find this very easy to figure out so that
> others
> can more easily figure it out. I'm sure that would be
> appreciated by
> all.
>
>
> -Thanks
>
>
> On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow
> <harlowja at fastmail.com <mailto:harlowja at fastmail.com>
> <mailto:harlowja at fastmail.com <mailto:harlowja at fastmail.com>>
> <mailto:harlowja at fastmail.com
> <mailto:harlowja at fastmail.com> <mailto:harlowja at fastmail.com
> <mailto:harlowja at fastmail.com>>>>
>
> wrote:
>
> Hi there,
>
> Michał is correct, it should be saved.
>
> Do u have a small example of what u are trying to do
> because that
> will help determine if what u are doing will be
> saved or
> whether it
> will not be.
>
> Or even possibly explaining what is being done
> would be fine to
> (more data/info for me to reason about what should be
> stored in your
> case).
>
> Thanks,
>
> Josh
>
>
> Michał Dulko wrote:
>
> On 01/26/2016 10:23 AM, pn kk wrote:
>
> Hi,
>
> I use taskflow for job management and now
> trying to
> persist
> the state
> of flows/tasks in mysql to recover incase of
> process crashes.
>
> I could see the state and the task results
> stored
> in the
> database.
>
> Now I am looking for some way to store the
> input
> parameters
> of the tasks.
>
> Please share your inputs to achieve this.
>
> -Thanks
>
> I've played with that some time ago and if I
> recall
> correctly input
> parameters should be available in the flow's
> storage, which
> means these
> are also saved to the DB. Take a look on
> resume_workflows method
> on my
> old PoC [1] (hopefully TaskFlow haven't
> changed much
> since then).
>
> [1]
> https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py
>
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage
> questions)
> Unsubscribe:
> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage
> questions)
> Unsubscribe:
> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage
> questions)
> Unsubscribe:
> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe:
> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe:
> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe:
> OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe: OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
More information about the OpenStack-dev
mailing list