[openstack-dev] [TaskFlow] TaskFlow persistence

Joshua Harlow harlowja at fastmail.com
Sat Mar 19 16:00:44 UTC 2016


Interesting error, that could be a bug and perhaps we should ensure 
upgrade is more thread-safe (with a lock on upgrade); can u open a bug @ 
bugs.launchpad.net/taskflow for that and we can try to add said lock 
(that should hopefully resolve what u are seeing, although if it doesn't 
then the bug lies somewhere else).

Thanks much!

-Josh

On 03/19/2016 08:45 AM, pnkk wrote:
> Hi Joshua,
>
> Thanks for all your inputs.
> We are using this feature successfully. But I rarely see an issue
> related to concurrency.
>
> To give you a brief, we use eventlets and every job runs in a separate
> eventlet thread.
>
> In the job execution part, we use taskflow functionality and persist all
> the details of the job.
>
> There we try to get the backend as below. We do upgrade everytime when a
> job is executed.
>
>          backend_uri = cfg.CONF.db.sqlalchemy_url
>          conf = {
>              'connection': backend_uri,
>          }
>          self.backend = backends.fetch(conf)
>          with contextlib.closing(self.backend.get_connection()) as conn:
>              conn.upgrade()
>
> Now when two jobs start executing at the same time, I see below error.
> Failed upgrading database version
>    DBAPIError: (exceptions.RuntimeError) reentrant call inside
> <_io.BufferedReader name=14>
>
> We have monkey patched eventlet, is it not supposed to take care of
> these concurrency issues?
>
> Below are the versions for related modules in use:
> eventlet==0.17.4
> taskflow==1.26.0
> SQLAlchemy==1.0.9
>
> Thanks,
> Kanthi
>
> On Fri, Feb 12, 2016 at 1:29 PM, Joshua Harlow <harlowja at fastmail.com
> <mailto:harlowja at fastmail.com>> wrote:
>
>     pn kk wrote:
>
>         Hi Joshua,
>
>         Yes, sure will do that once I get some window out of my work.
>
>         One last query(hopefully :) ) , can the factory method be an
>         instance
>         method of a class?
>
>
>     Instance methods are particularly hard to use (since they require an
>     instance of an object to be useful); so I think the check u have hit
>     is making sure that if the flow-factory is called to recreate the
>     flow that it can do so without having import issues. Currently I
>     believe it doesn't handle instance methods (due to the complexity of
>     needing an instance attached to that method).
>
>     Perhaps what u want is to provide a function that can be found like:
>
>     def make_flow_factory():
>         return FlowFactory().flow_factory
>
>     Or make flow_factory a class or static method, which should have the
>     same/similar effect.
>
>     Hope that makes sense (more queries are fine and welcome!)
>
>
>         I tried giving it as "FlowFactory().flow_factory", where
>         FlowFactory is
>         my class name. It failed with below error:
>         ValueError: Flow factory <bound method FlowFactory.flow_factory of
>         <__main__.FlowFactory instance at 0x2b43b48>> is not reimportable by
>         name __builtin__.instance.flow_factory
>
>         Thanks again
>         -Kanthi
>
>
>         On Thu, Jan 28, 2016 at 12:29 AM, Joshua Harlow
>         <harlowja at fastmail.com <mailto:harlowja at fastmail.com>
>         <mailto:harlowja at fastmail.com <mailto:harlowja at fastmail.com>>>
>         wrote:
>
>              pn kk wrote:
>
>                  Hi,
>
>                  Thanks for the responses. Putting it in a small example
>
>                  def flow_factory(tmp):
>                        return lf.Flow('resume from backend example').add(
>                            TestTask(name='first', test=tmp),
>                            InterruptTask(name='boom'),
>                            TestTask(name='second', test="second task"))
>
>
>                  class TestTask(task.Task):
>                        def __init__(self, name, provides=None, test,
>         **kwargs):
>                            self.test=test
>                            super(TestTask, self).__init__(name,
>         provides, **kwargs)
>                        def execute(self, *args, **kwargs):
>                            print('executing %s' % self)
>                            return 'ok'
>
>                  class InterruptTask(task.Task):
>                        def execute(self, *args, **kwargs):
>                            # DO NOT TRY THIS AT HOME
>                            engine.suspend()
>
>                  I was searching for a way in which I can reload the
>         flow after crash
>                  without passing the parameter "tmp" shown above
>                  Looks like "load_from_factory" gives this provision.
>
>
>              Thanks for the example, ya, this is one such way to do this
>         that u
>              have found, there are a few other ways, but that is the
>         main one
>              that people should be using.
>
>
>
>                  engine =
>
>         taskflow.engines.load_from_factory(flow_factory=flow_factory,
>                  factory_kwargs={"tmp":"test_data"}, book=book,
>         backend=backend)
>                  engine.run()
>
>                  Now it suspends after running interrupt task, I can now
>         reload
>                  the flow
>                  from the saved factory method without passing parameter
>         again.
>                  for flow_detail_2 in book:
>                        engine2 =
>         taskflow.engines.load_from_detail(flow_detail_2,
>                  backend=backend)
>
>                  engine2.run()
>
>                  Let me know if this is ok or is there a better approach to
>                  achieve this?
>
>
>              There are a few other ways, but this one should be the
>         currently
>              recommended one.
>
>              An idea, do u want to maybe update (submit a review to
>         update) the
>              docs, if u didn't find this very easy to figure out so that
>         others
>              can more easily figure it out. I'm sure that would be
>         appreciated by
>              all.
>
>
>                  -Thanks
>
>
>                  On Wed, Jan 27, 2016 at 12:03 AM, Joshua Harlow
>                  <harlowja at fastmail.com <mailto:harlowja at fastmail.com>
>         <mailto:harlowja at fastmail.com <mailto:harlowja at fastmail.com>>
>                  <mailto:harlowja at fastmail.com
>         <mailto:harlowja at fastmail.com> <mailto:harlowja at fastmail.com
>         <mailto:harlowja at fastmail.com>>>>
>
>                  wrote:
>
>                       Hi there,
>
>                       Michał is correct, it should be saved.
>
>                       Do u have a small example of what u are trying to do
>                  because that
>                       will help determine if what u are doing will be
>         saved or
>                  whether it
>                       will not be.
>
>                       Or even possibly explaining what is being done
>         would be fine to
>                       (more data/info for me to reason about what should be
>                  stored in your
>                       case).
>
>                       Thanks,
>
>                       Josh
>
>
>                       Michał Dulko wrote:
>
>                           On 01/26/2016 10:23 AM, pn kk wrote:
>
>                               Hi,
>
>                               I use taskflow for job management and now
>         trying to
>                  persist
>                               the state
>                               of flows/tasks in mysql to recover incase of
>                  process crashes.
>
>                               I could see the state and the task results
>         stored
>                  in the
>                               database.
>
>                               Now I am looking for some way to store the
>         input
>                  parameters
>                               of the tasks.
>
>                               Please share your inputs to achieve this.
>
>                               -Thanks
>
>                           I've played with that some time ago and if I
>         recall
>                  correctly input
>                           parameters should be available in the flow's
>         storage, which
>                           means these
>                           are also saved to the DB. Take a look on
>                  resume_workflows method
>                           on my
>                           old PoC [1] (hopefully TaskFlow haven't
>         changed much
>                  since then).
>
>                           [1]
>         https://review.openstack.org/#/c/152200/4/cinder/scheduler/manager.py
>
>
>
>         __________________________________________________________________________
>                           OpenStack Development Mailing List (not for usage
>                  questions)
>                           Unsubscribe:
>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
>
>         __________________________________________________________________________
>                       OpenStack Development Mailing List (not for usage
>         questions)
>                       Unsubscribe:
>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
>         __________________________________________________________________________
>                  OpenStack Development Mailing List (not for usage
>         questions)
>                  Unsubscribe:
>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
>         __________________________________________________________________________
>              OpenStack Development Mailing List (not for usage questions)
>              Unsubscribe:
>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>         __________________________________________________________________________
>         OpenStack Development Mailing List (not for usage questions)
>         Unsubscribe:
>         OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>         <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>         http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>     __________________________________________________________________________
>     OpenStack Development Mailing List (not for usage questions)
>     Unsubscribe:
>     OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
>     <http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe>
>     http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>
>
>
>
> __________________________________________________________________________
> OpenStack Development Mailing List (not for usage questions)
> Unsubscribe: OpenStack-dev-request at lists.openstack.org?subject:unsubscribe
> http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev
>



More information about the OpenStack-dev mailing list