This is how the taskset callback stuff is done:
By default the synchronization step is implemented by having a recurring task poll the completion of the taskset every second, applying the subtask when ready.
Example implementation:
def unlock_chord(taskset, callback, interval=1, max_retries=None): if taskset.ready(): return subtask(callback).delay(taskset.join()) unlock_chord.retry(countdown=i nterval, max_retries=max_retries)
This is used by all result backends except Redis, which increments a counter after each task in the header, then applying the callback when the counter exceeds the number of tasks in the set.
The Redis approach is a much better solution, but not easily implemented in other backends (suggestions welcome!)
No comments:
Post a Comment