<div dir="ltr">Thanks for the nice documentation.<div><br><div>To my knowledge celery is widely used for distributed task processing. This fits our requirement perfectly where we want to return immediate response to the user from our API server and run long running task in background. Celery also gives flexibility with the worker types(process(can overcome GIL problems too)/evetlet...) and it also provides nice message brokers(rabbitmq,redis...)</div><div><br></div><div>We used both celery and taskflow for our core processing to leverage the benefits of both. Taskflow provides nice primitives like(execute, revert, pre,post stuf) which takes off the load from the application.</div><div><br></div><div>As far as the actual issue is concerned, I found one way to solve it by using celery "retry" option. This along with late_acks makes the application highly fault tolerant.</div><div><br></div><div><a href="http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry">http://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry</a><br></div><div><br></div><div>Regards,</div><div>Kanthi</div><div><br></div></div></div><div class="gmail_extra"><br><div class="gmail_quote">On Sat, May 28, 2016 at 1:51 AM, Joshua Harlow <span dir="ltr"><<a href="mailto:harlowja@fastmail.com" target="_blank">harlowja@fastmail.com</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">Seems like u could just use <a href="http://docs.openstack.org/developer/taskflow/jobs.html" rel="noreferrer" target="_blank">http://docs.openstack.org/developer/taskflow/jobs.html</a> (it appears that you may not be?); the job itself would when failed be then worked on by a different job consumer.<br>
<br>
Have u looked at those? It almost appears that u are using celery as a job distribution system (similar to the jobs.html link mentioned above)? Is that somewhat correct (I haven't seen anyone try this, wondering how u are using it and the choices that directed u to that, aka, am curious)?<br>
<br>
-Josh<br>
<br>
pnkk wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><span class="">
To be specific, we hit this issue when the node running our service is<br>
rebooted.<br>
Our solution is designed in a way that each and every job is a celery<br>
task and inside celery task, we create taskflow flow.<br>
<br>
We enabled late_acks in celery(uses rabbitmq as message broker), so if<br>
our service/node goes down, other healthy service can pick the job and<br>
completes it.<br>
This works fine, but we just hit this rare case where the node was<br>
rebooted just when taskflow is updating something to the database.<br>
<br>
In this case, it raises an exception and the job is marked failed. Since<br>
it is complete(with failure), message is removed from the rabbitmq and<br>
other worker would not be able to process it.<br>
Can taskflow handle such I/O errors gracefully or should application try<br>
to catch this exception? If application has to handle it what would<br>
happen to that particular database transaction which failed just when<br>
the node is rebooted? Who will retry this transaction?<br>
<br>
Thanks,<br>
Kanthi<br>
<br>
On Fri, May 27, 2016 at 5:39 PM, pnkk <<a href="mailto:pnkk2016@gmail.com" target="_blank">pnkk2016@gmail.com</a><br></span><div><div class="h5">
<mailto:<a href="mailto:pnkk2016@gmail.com" target="_blank">pnkk2016@gmail.com</a>>> wrote:<br>
<br>
Hi,<br>
<br>
When taskflow engine is executing a job, the execution failed due to<br>
IO error(traceback pasted below).<br>
<br>
2016-05-25 19:45:21.717 7119 ERROR<br>
taskflow.engines.action_engine.engine 127.0.1.1 [-] Engine<br>
execution has failed, something bad must of happened (last 10<br>
machine transitions were [('SCHEDULING', 'WAITING'), ('WAITING',<br>
'ANALYZING'), ('ANALYZING', 'SCHEDULING'), ('SCHEDULING',<br>
'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING', 'SCHEDULING'),<br>
('SCHEDULING', 'WAITING'), ('WAITING', 'ANALYZING'), ('ANALYZING',<br>
'GAME_OVER'), ('GAME_OVER', 'FAILURE')])<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine Traceback (most recent call last):<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/engine.py",<br>
line 269, in run_iter<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine<br>
failure.Failure.reraise_if_any(memory.failures)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",<br>
line 336, in reraise_if_any<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine failures[0].reraise()<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/types/failure.py",<br>
line 343, in reraise<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine six.reraise(*self._exc_info)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",<br>
line 94, in schedule<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine<br>
futures.add(scheduler.schedule(atom))<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/scheduler.py",<br>
line 67, in schedule<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return<br>
self._task_action.schedule_execution(task)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",<br>
line 99, in schedule_execution<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine self.change_state(task,<br>
states.RUNNING, progress=0.0)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/engines/action_engine/actions/task.py",<br>
line 67, in change_state<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine<br></div></div>
self._storage.set_atom_state(<a href="http://task.name" rel="noreferrer" target="_blank">task.name</a> <<a href="http://task.name" rel="noreferrer" target="_blank">http://task.name</a>>, state)<div><div class="h5"><br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/fasteners/lock.py",<br>
line 85, in wrapper<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return f(self, *args,<br>
**kwargs)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",<br>
line 486, in set_atom_state<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine<br>
self._with_connection(self._save_atom_detail, source, clone)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",<br>
line 341, in _with_connection<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return functor(conn,<br>
*args, **kwargs)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/storage.py",<br>
line 471, in _save_atom_detail<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine<br>
original_atom_detail.update(conn.update_atom_details(atom_detail))<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/taskflow/persistence/backends/impl_sqlalchemy.py",<br>
line 427, in update_atom_details<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine row = conn.execute(q).first()<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",<br>
line 914, in execute<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return meth(self,<br>
multiparams, params)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",<br>
line 323, in _execute_on_connection<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return<br>
connection._execute_clauseelement(self, multiparams, params)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py",<br>
line 1003, in _execute_clauseelement<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine<br>
inline=len(distilled_params) > 1)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File "<string>", line 1, in<br>
<lambda><br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",<br>
line 494, in compile<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return<br>
self._compiler(dialect, bind=bind, **kw)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",<br>
line 500, in _compiler<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return<br>
dialect.statement_compiler(dialect, self, **kw)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",<br>
line 392, in __init__<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine Compiled.__init__(self,<br>
dialect, statement, **kwargs)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",<br>
line 190, in __init__<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine self.string =<br>
self.process(self.statement, **compile_kwargs)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",<br>
line 213, in process<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return<br>
obj._compiler_dispatch(self, **kwargs)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/visitors.py",<br>
line 81, in _compiler_dispatch<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine return meth(self, **kw)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",<br>
line 1579, in visit_select<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine for name, column in<br>
select._columns_plus_names<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/sqlalchemy/sql/compiler.py",<br>
line 1347, in _label_select_column<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine<br>
add_to_result_map=add_to_result_map<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/celery/apps/worker.py",<br>
line 288, in _handle_request<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine safe_say('worker: {0}<br>
shutdown (MainProcess)'.format(how))<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine File<br>
"/opt/nso/nso-1.1223-default/nfvo-0.8.0.dev1438/.venv/local/lib/python2.7/site-packages/celery/apps/worker.py",<br>
line 73, in safe_say<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine print('\n{0}'.format(msg),<br>
file=sys.__stderr__)<br>
2016-05-25 19:45:21.717 7119 TRACE<br>
taskflow.engines.action_engine.engine IOError: [Errno 5]<br>
Input/output error<br>
2016-05-25 19:45:21.717 7119 TRACE taskflow.engines.action_engine.engine<br>
<br>
There could be a transient network issue which prevents taskflow<br>
from reaching the mysql node.<br>
Can you please suggest a graceful way of handling it and continue<br>
processing the execution?<br>
<br>
Thanks,<br>
Kanthi<br>
<br>
<br></div></div>
__________________________________________________________________________<br>
OpenStack Development Mailing List (not for usage questions)<br>
Unsubscribe: <a href="http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe" rel="noreferrer" target="_blank">OpenStack-dev-request@lists.openstack.org?subject:unsubscribe</a><br>
<a href="http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev" rel="noreferrer" target="_blank">http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev</a><br>
</blockquote>
<br>
__________________________________________________________________________<br>
OpenStack Development Mailing List (not for usage questions)<br>
Unsubscribe: <a href="http://OpenStack-dev-request@lists.openstack.org?subject:unsubscribe" rel="noreferrer" target="_blank">OpenStack-dev-request@lists.openstack.org?subject:unsubscribe</a><br>
<a href="http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev" rel="noreferrer" target="_blank">http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev</a><br>
</blockquote></div><br></div>