Celery Primitives: Basics Of Building Distributed Workflow In Celery

Celery is a powerful distributed processing tools in a python ecosystem which can be used to build complex workflow with distributed nodes / (machines).
Often those primitives are based on group, chain and chords. This article covers the basic usage of those primitives to create a map reduce in the later parts after understanding group, chain and chords.
1. Group
Group is one of primitive where a task is broken to smaller chunks of tasks or some task can be done parallely. Let’s think of 5copies of you reading a 15 paged pdf, where a single copy of u will be reading 3page, i.e. 3 page/you. That’s where group sines, if you have a task to execute parallely, use groups.

# grouping:
group([<chain1>, <chain2>, <chain3>])
Single Tasl
# -----| |---------|---------------|
# |--------| Read Pdf| Process Text |----> chain1
# | |---------|---------------|
# Task |--------|Read Pdf | Process Text |----> chain3
# | |---------|---------------|
# |--------|Read Pdf | Process Text |----> chain2
#------| |---------|---------------|
@shared_task
def process_pdf_task(document_id):
read_process_text_chains = [
chain(
read_pdf.s(document_id, pages),
process_text.s()
) for pages in chunk_list(pages, 5)
]
# group of chains of task, note => read_process_text_chains is a list
task_group = group(read_process_text_chains)
2. Chains:
Chaining is a workflow primitives which lets us to chain a task. For example suppose, task B needs to be executed right after the completion of task B. Here task A and task B are chained together.

Here a task: Process Text starts just right after 3 agents complete reading 5 pages of a pdf parallely. Thus after one of agent completes reading a text, now a text is given to next agent i.e. Process Text. Here, each agents are Processing a 2 paged text from a pdf.
Now, its a time to merge or combine the output from each of those parallely running agents do the first task that is to Read A PDF and then Process Text From Page. i.e, 3 peoples reading and processing a 5 pages each independently.
# chaining:
chain(<read_pdf>, <process_text>) #----> |read_pdf| ---> |process_text|
@shared_task
def process_pdf_task(document_id):
# chains of agents reading 5 pages, i.e. 3 agents => 3 * 5 = 15
read_process_text_chains = [
chain(
read_pdf.s(document_id, pages),
process_text.s()
) for pages in chunk_list(pages, 5)
]
# group of chains of task, note => chains is a read_process_text_chains
task_group = group(read_process_text_chains)
3. Chords
Now we need to combine the Text that we got from different agents. So basically we get a Processed Text from each of those agents.

So, here we have a MergeResult agent that merges the result obtained from each of those 3 agents Reading and Processing Text. So, chords are basically used in reduce workflow, i.e. merging the results from the previous workers running parallely.
<task4> = chords(<task1>, <task2>, <task3>)
# --------------|
# process_text |
# --------------| |------------------|
# process_text |-------| merge_result |
# --------------| |------------------|
# process_text |
# ---==---------|
# result of task 1, 2, 3 combined in task4
@shared_task
def process_pdf_task(document_id):
# chains of agents reading 5 pages, i.e. 3 agents => 3 * 5 = 15
read_process_text_chains = [
chain(
read_pdf.s(document_id, pages),
process_text.s()
) for pages in chunk_list(pages, 5)
]
# group of chains of task, note => read_process_text_chains is a list
task_group = group(read_process_text_chains)
# combined a chains of task into a single chord, => reduce
merge_result = chord(
task_group,
combine_result.s(document_id)
)
# Step 4: Execute the task asynchronously
result = task.apply_async()
So, in this example task flow we have a 3 group of task runing chain of 2 tasks which are at last combined into a single result as a chord. Thus grouping acting as a map while chords acting as a reduce.