First, set the root logger and "celery.task.default" to use DEBUG mode:
import logging logging.getLogger('celery.task.default').setLevel(pythonLogging.DEBUG) logging.getLogger().setLevel(pythonLogging.DEBUG)
Set ALWAYS_EAGER mode so that Celery will always invoke tasks locally instead of dispatching to the Celery machine.
Set EAGER_PROPAGATES_EXCEPTION so that any exceptions within tasks will be bubbled up so that you can actually see any exceptions that may cause your batch calls to fail (i.e. any uncaught exception can cause a fatal error!)
from celery import current_app current_app.conf.CELERY_ALWAYS_EAGER = True current_app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True from celery.utils import LOG_LEVELS current_app.conf.CELERYD_LOG_LEVEL = LOG_LEVELS['DEBUG'] # pretty much the same as logging.DEBUG
Finally, if you are invoking a task from the same Python script, you should import the task_name as if it were being imported, even if the function is declared within the same file. The reason is that when running the Celeryd daemon and looks for registered tasks, Celery will consider the task function you invoked to come from the "__main__" class. The way to get around it is to import the task residing in the same file, assuming your PYTHONPATH is set correctly.
from celery.decorators import task @task def task_name(): print "here" return 1 if "__name__ == "__main__": from
import task_name task_name.apply_async()
(Note: This information has been updated to reflect Celery v2.3.3 inner-workings).