Bharat's Digital Garden

Celery

Celery is insanely powerful [python] task scheduler, it is almost magical how much it can do with so less code.

A unit in [celery] is called a task. A function can be also used as a task by using the @shared_task or @app.task (assuming you have already defined app, as shown in the documentation) decorator. A broker is needed to process messages, supported broker platforms are SQS, rabbit MQ etc.. tasks can be scheduled, run after some delay, really complex workflows can be defined etc..

The thing which really interests me is the complex workflows. Because if I want task processing using a queue, I would just use SQS.

Hypothetical use case to demo celery’s power

You have a service which will send emails from a csv file. This hypothetical service let’s people upload a csv file with 2 columns, email and contents. Here is what you are supposed to do

Here we can see there is a validation task, a verify email task, send email task and send a success email back to the customer. The workflow might look something like this:

+-------------------------------------------+                                                 
|                                           |                                                 
|               validate_file               |                                                 
|                                           |                                                 
+-------------------------------------------+                                                 
         +-----------------------+                                                            
         |                       |                                                            
         |                       |                                                            
         |                       |                                                            
         |                       |                                                            
 +---------------+      +------------------+                                                  
 | verified_check|      |  verified_check  |                                                  
 |       |       |      |        |         |                                                  
 |       |       |      |        |         |                                                  
 |       v       |      |        v         |                                                  
 |  send_email   |      |    send_email    |                                                 
 +---------------+      +------------------+                                                  
         |                       |                                                            
         |                       |                                                            
         |                       |                                                            
         +-----------------------+                                                            
                     |                                                                        
                     |                                                                        
          +---------------------+                                                             
          | send_success_email  |                                                             
          +---------------------+                                                             

Although this is not hyper complex, it is fairly complicated. So, execute a task and then execute a group of (chain of two tasks) and post completion execute something else. This is key, verified_check and send_email are a chain. They need to be executed in sequence. But the chains should be executed in parallel. The vocabulary is very important.

I love what the [celery] authors call the piece in [celery] which takes care of designing these workflows, they call it canvas. It is a canvas on which we can draw distributed workflows.

A canvas has many primitives, we will go over two of the important ones.

Note that these are primitives. For example, chains can be grouped, groups can be chained etc..

Here are our functions:

# file: tasks_email.py
from datetime import datetime

from [[celery]] import Celery, chain, group

app = Celery('tasks_email', broker='redis://localhost:6379/0', backend='redis')


@app.task
def process_email_file():
    list_of_emails = ['a@gmail.com', 'b@protonmail.com']
    final_chain = chain(
        group(chain(verified_check.s(i), send_email.s(datetime.now())) for i in list_of_emails),
        success_on_complete_email.si(),
    )
    final_chain.delay()
    return list_of_emails


@app.task
def verified_check(msg: str):
    print(f"verifying email {msg} at {datetime.now()}")
    return msg


@app.task
def send_email(msg: str, now: datetime, ):
    print(f"Sending email at {now} for {msg}")
    return msg


@app.task
def success_on_complete_email():
    print(f"Sending bulk send complete email to customer at {datetime.now()} !")

if __name__ == "__main__":
    process_email_file.delay()

To run this, first run the [celery] worker which will pickup the tasks and run them for you in the background

[[celery]] -A tasks_email worker

and run the file

[[python]] tasks_email.py

Here is the output in my [celery] worker instance

[2020-07-02 00:21:54,564: WARNING/ForkPoolWorker-8] Calling tax payer at 2020-07-02 00:21:54.564389 for a@gmail.com
[2020-07-02 00:21:54,566: WARNING/ForkPoolWorker-1] Calling tax payer at 2020-07-02 00:21:54.566654 for b@protonmail.com
[2020-07-02 00:21:54,593: WARNING/ForkPoolWorker-8] Calling tax filing at 2020-07-02T00:21:54.526869 for b@protonmail.com
[2020-07-02 00:21:54,593: WARNING/ForkPoolWorker-7] Calling tax filing at 2020-07-02T00:21:54.526270 for a@gmail.com
[2020-07-02 00:21:54,606: WARNING/ForkPoolWorker-7] Sending on bulk send complete email at 2020-07-02 00:21:54.606306 !

Some notes:

That’s it. That was us Painting on a distributed canvas!