Skip to content

Commit 402df2b

Browse files
committed
implement handler exec: publish, childs, pause - unpause
1 parent e581853 commit 402df2b

19 files changed

Lines changed: 521 additions & 109 deletions

README.md

Lines changed: 233 additions & 45 deletions
Large diffs are not rendered by default.

examples/childs/consumer1.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# importing the lib
2+
from omniq.client import OmniqClient
3+
4+
# creating your handler (ctx will have all the job information and actions)
5+
def document_worker(ctx):
6+
7+
# getting the data from payload
8+
document_id = ctx.payload["document_id"]
9+
pages = ctx.payload["pages"]
10+
11+
# creating the unique key to track the childs
12+
completion_key = f"document:{document_id}"
13+
14+
print(f"[document_worker] Initializing completion for {pages} pages")
15+
16+
# calling the childs initialization
17+
ctx.exec.childs_init(completion_key, pages)
18+
19+
# publishing 5 jobs on the pages queue
20+
for page in range(1, pages + 1):
21+
# ctx.exec also have the publisher ready to use
22+
ctx.exec.publish(
23+
queue="pages",
24+
payload={
25+
"document_id": document_id,
26+
"page": page,
27+
"completion_key": completion_key,
28+
},
29+
)
30+
31+
print("[document_worker] All page jobs published.")
32+
33+
# creating OmniQ passing redis information
34+
omniq = OmniqClient(
35+
host="omniq-redis",
36+
port=6379,
37+
)
38+
39+
# creating the consumer that will listen and execute the actions in your handler
40+
omniq.consume(
41+
queue="documents",
42+
handler=document_worker,
43+
verbose=True,
44+
drain=False,
45+
)

examples/childs/consumer2.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import time
2+
3+
# importing the lib
4+
from omniq.client import OmniqClient
5+
6+
# creating your handler (ctx will have all the job information and actions)
7+
def page_worker(ctx):
8+
9+
page = ctx.payload["page"]
10+
# getting the unique key to track the childs
11+
completion_key = ctx.payload["completion_key"]
12+
13+
print(f"[page_worker] Processing page {page} (job_id={ctx.job_id})")
14+
time.sleep(1.5)
15+
16+
# acking itself as a child the number of remaining jobs are returned so we can say when the last job was executed
17+
remaining = ctx.exec.child_ack(completion_key)
18+
19+
print(f"[page_worker] Page {page} done. Remaining={remaining}")
20+
21+
22+
# remaining will be 0 ONLY when this is the last job
23+
# will return > 0 when are still jobs to process
24+
# and -1 if something goes wrong with the counter
25+
if remaining == 0:
26+
print("[page_worker] Last page finished.")
27+
28+
# creating OmniQ passing redis information
29+
omniq = OmniqClient(
30+
host="omniq-redis",
31+
port=6379,
32+
)
33+
34+
# creating the consumer that will listen and execute the actions in your handler
35+
omniq.consume(
36+
queue="pages",
37+
handler=page_worker,
38+
verbose=True,
39+
drain=False,
40+
)

examples/childs/publish.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# importing the lib
2+
from omniq.client import OmniqClient
3+
4+
# creating OmniQ passing redis information
5+
omniq = OmniqClient(
6+
host="omniq-redis",
7+
port=6379,
8+
)
9+
10+
# publishing the job
11+
job_id = omniq.publish(
12+
queue="documents",
13+
payload={
14+
"document_id": "doc-123", # this will be our unique key to initiate childs and tracking then until completion
15+
"pages": 5, # each page must be completed before something happen
16+
},
17+
)
18+
print("OK", job_id)

examples/consumer.py

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import time
2+
3+
# importing the lib
4+
from omniq.client import OmniqClient
5+
6+
# creating your handler (ctx will have all the job information and actions)
7+
def pause_unpause_example(ctx):
8+
print("Waiting 2 seconds")
9+
10+
# checking if this queue it is paused (spoiler: it's not)
11+
is_paused = ctx.exec.is_paused(
12+
"test"
13+
)
14+
print("Is paused", is_paused)
15+
time.sleep(2)
16+
17+
18+
print("Pausing")
19+
20+
# pausing this queue (this job it's and others active jobs will be not affected but not new job will be start until queue is resumed)
21+
ctx.exec.pause(
22+
"test"
23+
)
24+
25+
# checking again now is suposed to be paused
26+
is_paused = ctx.exec.is_paused(
27+
"test"
28+
)
29+
print("Is paused", is_paused)
30+
time.sleep(2)
31+
32+
print("Resuming")
33+
34+
# resuming this queue (all other workers can process jobs again)
35+
ctx.exec.resume(
36+
"test"
37+
)
38+
39+
# checking again and is suposed to be resumed
40+
is_paused = ctx.exec.is_paused(
41+
"test"
42+
)
43+
print("Is paused", is_paused)
44+
time.sleep(2)
45+
46+
print("Done")
47+
48+
# creating OmniQ passing redis information
49+
omniq = OmniqClient(
50+
host="omniq-redis",
51+
port=6379,
52+
)
53+
54+
# creating the consumer that will listen and execute the actions in your handler
55+
omniq.consume(
56+
queue="test",
57+
handler=pause_unpause_example,
58+
verbose=True,
59+
drain=False,
60+
)
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
# importing the lib
12
from omniq.client import OmniqClient
23

4+
# creating OmniQ passing redis information
35
uq = OmniqClient(
46
host="omniq-redis",
57
port=6379,
68
)
9+
10+
# publishing the job
711
job_id = uq.publish(
8-
queue="demo",
12+
queue="test",
913
payload={"hello": "world"},
1014
timeout_ms=30_000
1115
)

examples/simple/consumer.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import time
2+
3+
# importing the lib
4+
from omniq.client import OmniqClient
5+
6+
# creating your handler (ctx will have all the job information and actions)
7+
def my_actions(ctx):
8+
print("Waiting 2 seconds")
9+
time.sleep(2)
10+
print("Done")
11+
12+
# creating OmniQ passing redis information
13+
omniq = OmniqClient(
14+
host="omniq-redis",
15+
port=6379,
16+
)
17+
18+
# creating the consumer that will listen and execute the actions in your handler
19+
omniq.consume(
20+
queue="demo",
21+
handler=my_actions,
22+
verbose=True,
23+
drain=False,
24+
)

examples/simple/publish.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# importing the lib
2+
from omniq.client import OmniqClient
3+
4+
# creating OmniQ passing redis information
5+
omniq = OmniqClient(
6+
host="omniq-redis",
7+
port=6379,
8+
)
9+
10+
# publishing the job
11+
job_id = omniq.publish(
12+
queue="demo",
13+
payload={"hello": "world"},
14+
timeout_ms=30_000
15+
)
16+
17+
print("OK", job_id)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "omniq"
3-
version = "1.4.1"
3+
version = "1.5.0"
44
authors = [
55
{ name="Not Empty Foundation", email="dev@not-empty.org" },
66
]

0 commit comments

Comments
 (0)