In this part we cover the following topics:
Introduction to task queues
!!!To do!!!
Sources
Install
- Choosing and installing a message transport (broker). !!!something about RabbitMQ!!!
- Celery install is pretty easy.
1234567(base) ubd@ubd-virtual-machine:~$ conda activate test_env(test_env) ubd@ubd-virtual-machine:~$ pip install celeryCollecting celery[...]Installing collected packages: billiard, vine, amqp, kombu, celerySuccessfully installed amqp-2.4.2 billiard-3.6.0.0 celery-4.3.0 kombu-4.5.0 vine-1.3.0(test_env) ubd@ubd-virtual-machine:~$ - Monitor Celery in Real Time. Flower is a real-time web-based monitor for Celery. Using Flower, you could easily monitor your task progress and history. As before, can use pip to install Flower:
1(test_env) ubd@ubd-virtual-machine:~$ pip install flower
Basic project
Below is the structure of our demo project.
1 2 3 4 5 |
basic __init__.py celery.py tasks.py job.py |
- Leaev
__init__.py
empty. - Add the following code in
celery.py
123456from celery import Celeryapp = Celery('basic',broker='amqp://localhost',backend='rpc://',include=['basic.tasks']) - In the
tasks.py
file define our tasks
12345678910111213141516from basic.celery import appfrom time import ctime, sleep@app.taskdef task_add(x, y, workingTime, name):print("BEGIN task_add: name %s, working time: %s, (%s)" % (name, workingTime, ctime()))sleep(workingTime)print("END task_add: name %s (%s)" % (name, workingTime))return x + y@app.taskdef task_mul(x, y, workingTime, name):print("BEGIN task_mul: name %s, working time: %s, (%s)" % (name, workingTime, ctime()))sleep(workingTime)print("END task_mul: name %s (%s)" % (name, workingTime))return x * y - Add the following code in
job.py
1234567891011121314151617181920212223242526272829303132333435363738394041424344from .tasks import task_add, task_mulfrom time import ctime, sleepif __name__ == '__main__':results = []name = "add_1"result_add_1 = task_add.delay(1, 2, 30, name)results.append({"name": name, "result": result_add_1})name = "add_2"result_add_2 = task_add.delay(3, 4, 20, name)results.append({"name": name, "result": result_add_2})name = "add_3"result_add_3 = task_add.delay(5, 6, 10, name)results.append({"name": name, "result": result_add_3})name = "mul_1"result_mul_1 = task_mul.delay(1, 2, 30, name)results.append({"name": name, "result": result_mul_1})name = "mul_2"result_mul_2 = task_mul.delay(3, 4, 20, name)results.append({"name": name, "result": result_mul_2})name = "mul_3"result_mul_3 = task_mul.delay(5, 6, 10, name)results.append({"name": name, "result": result_mul_3})completed = 0while completed < 6:completed = 0print("\n==========\n%s\n==========" % (ctime()))for result in results:#print(result)print("Task %s: ready: %s, result: %s" % (result["name"], result["result"].ready(), result["result"].result))if result["result"].ready():completed = completed + 1sleep(1) - Open terminal, let's call it
Terminal 1
12345678910111213141516171819202122232425(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$ celery --app=basic worker --loglevel=info-------------- celery@ubd-virtual-machine v4.3.0 (rhubarb)---- **** -------- * *** * -- Linux-4.15.0-47-generic-x86_64-with-debian-buster-sid 2019-04-29 13:53:04-- * - **** ---- ** ---------- [config]- ** ---------- .> app: basic:0x7fce9172a1d0- ** ---------- .> transport: amqp://guest:**@localhost:5672//- ** ---------- .> results: rpc://- *** --- * --- .> concurrency: 1 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. basic.tasks.task_add. basic.tasks.task_mul[2019-04-29 13:53:04,269: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//[2019-04-29 13:53:04,328: INFO/MainProcess] mingle: searching for neighbors[2019-04-29 13:53:05,430: INFO/MainProcess] mingle: all alone[2019-04-29 13:53:05,511: INFO/MainProcess] celery@ubd-virtual-machine ready.
- Open second terminal, let's call it
Terminal 2
from 13:53:36 to 13:55:37 (2m 01s)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$ python -m basic.job==========Mon Apr 29 13:53:36 2019==========Task add_1: ready: False, result: NoneTask add_2: ready: False, result: NoneTask add_3: ready: False, result: NoneTask mul_1: ready: False, result: NoneTask mul_2: ready: False, result: NoneTask mul_3: ready: False, result: None[...]==========Mon Apr 29 13:54:06 2019==========Task add_1: ready: True, result: 3Task add_2: ready: False, result: NoneTask add_3: ready: False, result: NoneTask mul_1: ready: False, result: NoneTask mul_2: ready: False, result: NoneTask mul_3: ready: False, result: None[...]==========Mon Apr 29 13:54:27 2019==========Task add_1: ready: True, result: 3Task add_2: ready: True, result: 7Task add_3: ready: False, result: NoneTask mul_1: ready: False, result: NoneTask mul_2: ready: False, result: NoneTask mul_3: ready: False, result: None[...]==========Mon Apr 29 13:54:36 2019==========Task add_1: ready: True, result: 3Task add_2: ready: True, result: 7Task add_3: ready: True, result: 11Task mul_1: ready: False, result: NoneTask mul_2: ready: False, result: NoneTask mul_3: ready: False, result: None[...]==========Mon Apr 29 13:55:06 2019==========Task add_1: ready: True, result: 3Task add_2: ready: True, result: 7Task add_3: ready: True, result: 11Task mul_1: ready: True, result: 2Task mul_2: ready: False, result: NoneTask mul_3: ready: False, result: None[...]==========Mon Apr 29 13:55:26 2019==========Task add_1: ready: True, result: 3Task add_2: ready: True, result: 7Task add_3: ready: True, result: 11Task mul_1: ready: True, result: 2Task mul_2: ready: True, result: 12Task mul_3: ready: False, result: None[...]==========Mon Apr 29 13:55:37 2019==========Task add_1: ready: True, result: 3Task add_2: ready: True, result: 7Task add_3: ready: True, result: 11Task mul_1: ready: True, result: 2Task mul_2: ready: True, result: 12Task mul_3: ready: True, result: 30(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$ - The same time as in
Terminal 2
we should be able to observe inTerminal 1
results similar to the following123456789101112131415161718192021222324[2019-04-29 13:53:35,838: INFO/MainProcess] Received task: basic.tasks.task_add[5524bd4e-9b08-46d5-80b5-8fb1e10ac015][2019-04-29 13:53:35,840: WARNING/ForkPoolWorker-1] BEGIN task_add: name add_1, working time: 30, (Mon Apr 29 13:53:35 2019)[2019-04-29 13:53:35,844: INFO/MainProcess] Received task: basic.tasks.task_add[d3f20232-2a5f-4478-864e-dd0fc950ed8a][2019-04-29 13:53:35,856: INFO/MainProcess] Received task: basic.tasks.task_add[b528ac55-3e76-49ba-b90e-0232376096d5][2019-04-29 13:53:35,864: INFO/MainProcess] Received task: basic.tasks.task_mul[d0db4c9a-57fc-48a2-9361-adfbbe4a8e13][2019-04-29 13:53:35,881: INFO/MainProcess] Received task: basic.tasks.task_mul[ce455da1-e4e2-4406-866f-3cc5fd370ad2][2019-04-29 13:54:05,872: WARNING/ForkPoolWorker-1] END task_add: name add_1 (30)[2019-04-29 13:54:06,031: INFO/ForkPoolWorker-1] Task basic.tasks.task_add[5524bd4e-9b08-46d5-80b5-8fb1e10ac015] succeeded in 30.191162027000246s: 3[2019-04-29 13:54:06,039: WARNING/ForkPoolWorker-1] BEGIN task_add: name add_2, working time: 20, (Mon Apr 29 13:54:06 2019)[2019-04-29 13:54:06,041: INFO/MainProcess] Received task: basic.tasks.task_mul[f71876b5-e025-4d0d-a09c-3eff061b806e][2019-04-29 13:54:26,066: WARNING/ForkPoolWorker-1] END task_add: name add_2 (20)[2019-04-29 13:54:26,068: INFO/ForkPoolWorker-1] Task basic.tasks.task_add[d3f20232-2a5f-4478-864e-dd0fc950ed8a] succeeded in 20.028893705999508s: 7[2019-04-29 13:54:26,070: WARNING/ForkPoolWorker-1] BEGIN task_add: name add_3, working time: 10, (Mon Apr 29 13:54:26 2019)[2019-04-29 13:54:36,083: WARNING/ForkPoolWorker-1] END task_add: name add_3 (10)[2019-04-29 13:54:36,089: INFO/ForkPoolWorker-1] Task basic.tasks.task_add[b528ac55-3e76-49ba-b90e-0232376096d5] succeeded in 10.019492949002597s: 11[2019-04-29 13:54:36,098: WARNING/ForkPoolWorker-1] BEGIN task_mul: name mul_1, working time: 30, (Mon Apr 29 13:54:36 2019)[2019-04-29 13:55:06,128: WARNING/ForkPoolWorker-1] END task_mul: name mul_1 (30)[2019-04-29 13:55:06,130: INFO/ForkPoolWorker-1] Task basic.tasks.task_mul[d0db4c9a-57fc-48a2-9361-adfbbe4a8e13] succeeded in 30.03216361900195s: 2[2019-04-29 13:55:06,131: WARNING/ForkPoolWorker-1] BEGIN task_mul: name mul_2, working time: 20, (Mon Apr 29 13:55:06 2019)[2019-04-29 13:55:26,144: WARNING/ForkPoolWorker-1] END task_mul: name mul_2 (20)[2019-04-29 13:55:26,147: INFO/ForkPoolWorker-1] Task basic.tasks.task_mul[ce455da1-e4e2-4406-866f-3cc5fd370ad2] succeeded in 20.01530200599882s: 12[2019-04-29 13:55:26,148: WARNING/ForkPoolWorker-1] BEGIN task_mul: name mul_3, working time: 10, (Mon Apr 29 13:55:26 2019)[2019-04-29 13:55:36,158: WARNING/ForkPoolWorker-1] END task_mul: name mul_3 (10)[2019-04-29 13:55:36,160: INFO/ForkPoolWorker-1] Task basic.tasks.task_mul[f71876b5-e025-4d0d-a09c-3eff061b806e] succeeded in 10.012230078998982s: 30 - Run Flower. Having Celery still running in
Terminal 1
open new terminalTerminal 3
and run Flower in it12345678910111213141516(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$ celery --app=basic flower --loglevel=info[I 190429 14:16:05 command:136] Visit me at http://localhost:5555[I 190429 14:16:05 command:141] Broker: amqp://guest:**@localhost:5672//[I 190429 14:16:05 command:144] Registered tasks:['basic.tasks.task_add','basic.tasks.task_mul','celery.accumulate','celery.backend_cleanup','celery.chain','celery.chord','celery.chord_unlock','celery.chunks','celery.group','celery.map','celery.starmap'][I 190429 14:16:05 mixins:229] Connected to amqp://guest:**@127.0.0.1:5672// - Open second terminal,
Terminal 2
and run our job1(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$ python -m basic.jobThis time check what Flower allows us to control.
Notice, that acording to Celery settings report displayed at start1- *** --- * --- .> concurrency: 1 (prefork)only one task is executed at a time.
As previously, in
Terminal 1
we should see the following results
12345678910111213141516171819202122232425[2019-04-29 14:15:20,472: INFO/MainProcess] Events of group {task} enabled by remote.[2019-04-29 14:17:18,352: INFO/MainProcess] Received task: basic.tasks.task_add[6f636594-754c-493d-b4c1-92833aa7efc2][2019-04-29 14:17:18,354: WARNING/ForkPoolWorker-1] BEGIN task_add: name add_1, working time: 30, (Mon Apr 29 14:17:18 2019)[2019-04-29 14:17:18,360: INFO/MainProcess] Received task: basic.tasks.task_add[731aa059-eaa3-4c87-a694-9b7a427f52a5][2019-04-29 14:17:18,378: INFO/MainProcess] Received task: basic.tasks.task_add[dfcd0feb-540b-4491-aa9b-10f6d15ea65f][2019-04-29 14:17:18,383: INFO/MainProcess] Received task: basic.tasks.task_mul[d88b80cd-7c05-47f1-a9d5-d9f47d100170][2019-04-29 14:17:18,396: INFO/MainProcess] Received task: basic.tasks.task_mul[a14c0456-ed1b-4f40-b66f-9a31e36bb860][2019-04-29 14:17:48,385: WARNING/ForkPoolWorker-1] END task_add: name add_1 (30)[2019-04-29 14:17:48,575: INFO/ForkPoolWorker-1] Task basic.tasks.task_add[6f636594-754c-493d-b4c1-92833aa7efc2] succeeded in 30.22108956000011s: 3[2019-04-29 14:17:48,600: INFO/MainProcess] Received task: basic.tasks.task_mul[a2e79e89-ad15-468f-9f66-9573b9130ee7][2019-04-29 14:17:48,602: WARNING/ForkPoolWorker-1] BEGIN task_add: name add_2, working time: 20, (Mon Apr 29 14:17:48 2019)[2019-04-29 14:18:08,619: WARNING/ForkPoolWorker-1] END task_add: name add_2 (20)[2019-04-29 14:18:08,624: INFO/ForkPoolWorker-1] Task basic.tasks.task_add[731aa059-eaa3-4c87-a694-9b7a427f52a5] succeeded in 20.021931107003184s: 7[2019-04-29 14:18:08,649: WARNING/ForkPoolWorker-1] BEGIN task_add: name add_3, working time: 10, (Mon Apr 29 14:18:08 2019)[2019-04-29 14:18:18,664: WARNING/ForkPoolWorker-1] END task_add: name add_3 (10)[2019-04-29 14:18:18,666: INFO/ForkPoolWorker-1] Task basic.tasks.task_add[dfcd0feb-540b-4491-aa9b-10f6d15ea65f] succeeded in 10.017316512999969s: 11[2019-04-29 14:18:18,682: WARNING/ForkPoolWorker-1] BEGIN task_mul: name mul_1, working time: 30, (Mon Apr 29 14:18:18 2019)[2019-04-29 14:18:48,702: WARNING/ForkPoolWorker-1] END task_mul: name mul_1 (30)[2019-04-29 14:18:48,704: INFO/ForkPoolWorker-1] Task basic.tasks.task_mul[d88b80cd-7c05-47f1-a9d5-d9f47d100170] succeeded in 30.02223128499827s: 2[2019-04-29 14:18:48,708: WARNING/ForkPoolWorker-1] BEGIN task_mul: name mul_2, working time: 20, (Mon Apr 29 14:18:48 2019)[2019-04-29 14:19:08,717: WARNING/ForkPoolWorker-1] END task_mul: name mul_2 (20)[2019-04-29 14:19:08,719: INFO/ForkPoolWorker-1] Task basic.tasks.task_mul[a14c0456-ed1b-4f40-b66f-9a31e36bb860] succeeded in 20.01160486899971s: 12[2019-04-29 14:19:08,723: WARNING/ForkPoolWorker-1] BEGIN task_mul: name mul_3, working time: 10, (Mon Apr 29 14:19:08 2019)[2019-04-29 14:19:18,734: WARNING/ForkPoolWorker-1] END task_mul: name mul_3 (10)[2019-04-29 14:19:18,736: INFO/ForkPoolWorker-1] Task basic.tasks.task_mul[a2e79e89-ad15-468f-9f66-9573b9130ee7] succeeded in 10.012880333000794s: 30 - To allow (or force) Celery to execute more than one task at a time, use
-c
option (inTerminal 1
pressControl + C
to stop running Celery)
12345678910111213141516171819202122232425(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$ celery -c 6 --app=basic worker --loglevel=info-------------- celery@ubd-virtual-machine v4.3.0 (rhubarb)---- **** -------- * *** * -- Linux-4.15.0-47-generic-x86_64-with-debian-buster-sid 2019-04-29 14:23:36-- * - **** ---- ** ---------- [config]- ** ---------- .> app: basic:0x7f348e4b9a90- ** ---------- .> transport: amqp://guest:**@localhost:5672//- ** ---------- .> results: rpc://- *** --- * --- .> concurrency: 6 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. basic.tasks.task_add. basic.tasks.task_mul[2019-04-29 14:23:36,679: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//[2019-04-29 14:23:36,740: INFO/MainProcess] mingle: searching for neighbors[2019-04-29 14:23:37,845: INFO/MainProcess] mingle: all alone[2019-04-29 14:23:37,923: INFO/MainProcess] celery@ubd-virtual-machine ready. - Now execution of our job should be much more effective (use
Terminal 2
)from 14:25:08 to 14:25:39 (31s))
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$ python -m basic.job==========Mon Apr 29 14:25:08 2019==========Task add_1: ready: False, result: NoneTask add_2: ready: False, result: NoneTask add_3: ready: False, result: NoneTask mul_1: ready: False, result: NoneTask mul_2: ready: False, result: NoneTask mul_3: ready: False, result: None[...]==========Mon Apr 29 14:25:19 2019==========Task add_1: ready: False, result: NoneTask add_2: ready: False, result: NoneTask add_3: ready: True, result: 11Task mul_1: ready: False, result: NoneTask mul_2: ready: False, result: NoneTask mul_3: ready: True, result: 30[...]==========Mon Apr 29 14:25:28 2019==========Task add_1: ready: False, result: NoneTask add_2: ready: True, result: 7Task add_3: ready: True, result: 11Task mul_1: ready: False, result: NoneTask mul_2: ready: False, result: NoneTask mul_3: ready: True, result: 30==========Mon Apr 29 14:25:29 2019==========Task add_1: ready: False, result: NoneTask add_2: ready: True, result: 7Task add_3: ready: True, result: 11Task mul_1: ready: False, result: NoneTask mul_2: ready: True, result: 12Task mul_3: ready: True, result: 30[...]==========Mon Apr 29 14:25:39 2019==========Task add_1: ready: True, result: 3Task add_2: ready: True, result: 7Task add_3: ready: True, result: 11Task mul_1: ready: True, result: 2Task mul_2: ready: True, result: 12Task mul_3: ready: True, result: 30(test_env) ubd@ubd-virtual-machine:~/Pulpit/code/celery$