Celery

Celery est utilisé pour lancer des tasks

  • qui peuvent durer longtemps (ex : génération de fichier d’export)

  • qui peuvent être lancées en arrière plan, l’utilisateur n’attendant pas de retour (ex : génération des comptes de résultat)

  • qui peuvent être lancées en arrière plan après la réussite d’une requête de l’utilisateur (ex : génération d’un pdf à la validation d’une facture)

Fonctionnement en arrière plan

Une tâche celery est appelée de manière asynchrone via un appel à apply_async? La base de données redis reçoit alors la task qui est prise en charge par le service worker de endi_celery.

Si enDI attend un retour de la part du service celery, un modèle de base de données mysql (héritant de endi_celery.models::Job) est utilisé pour retourner le résultat de la tâche celery à enDI

Cas 1 : l’utilisateur attend un retour

Endi lance une task asynchrone. Cette task doit être décorée avec pyramid_celery::celery_app.task.

Le retour de la tâche est transmis à endi via la base de données mysql.

from pyramid_celery import celery_app


# bind=True permet de bénéficier des données d'environnement pyramid
# (registry, request...) au sein de la task via self.request
@celery_app.task(bind=True)
def mytask(
    self,
    job_id,
    params
):
    ...

Afin d’en récupérer le résultat, on crée une instance d’un modèle héritant de endi_celery.models::Job On passe l’identifiant à la task qui écrit son retour (par exemple un chemin sur disque ou un message d’erreur) dans la base de données.

Un mixin endi.views::AsyncJobMixin regroupe les actions nécessaires pour l’instanciation d’un job et le lancement de la tâche.

class MyView(BaseView, AsyncJobMixin):
    def __call__(self):
        ...
        celery_error_resp = self.is_celery_alive()
        if celery_error_resp:
            return celery_error_resp
        else:
            job_result = self.initialize_job_result(Job)
            celery_job = mytask.apply_async(
                args=[job_result.id, myparams]
            )
            return self.redirect_to_job_watch(celery_job, job_result)

Cas 2 : Une task longue est lancée par l’utilisateur

Endi lance une task asynchrone. Cette task doit être décorée avec pyramid_celery::celery_app.task.

from pyramid_celery import celery_app

@celery_app.task
def mytask(param1):
    ...

Elle peut être lancée comme une task celery classique.

from endi_celery.tasks.mymodule import mytask

def myview(context, request):
    ...
    mytask.apply_async(args=[myparam1], **celery_options)
    ...

Cas 3 : Lancement d’une task dans la continuité de la transaction courante

Certaines tasks doivent être lancées à la suite de la réussite d’une requête. Par exemple, le fichier PDF d’une facture doit être généré lorsque la facture est validée. Si la validation de la facture échoue, le PDF ne doit pas être généré.

Pour cela nous utilisons des task « transactionnelles » qui ne se lancent que si les modifications en base de données sont committées. Celle-ci doivent être décorée par endi_celery.transactional_task::task_tm.

from endi_celery.transactional_task import task_tm

@task_tm
def async_internalestimation_valid_callback(document_id):
    ...