From 9fae34a190bcb04a0bea7428eaf5c71188f93f9e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 23 Dec 2020 12:45:27 -0500 Subject: [PATCH 01/16] Add our version of the std lib's "worker pool" This is a draft of the `tractor` way to implement the example from the "processs pool" in the stdlib's `concurrent.futures` docs: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example Our runtime is of course slower to startup but once up we of course get the same performance, this confirms that we need to focus some effort not on warm up and teardown times. The mp forkserver method definitely improves startup delay; rolling our own will likely be a good hot spot to play with. What's really nice is our implementation is done in approx 10th the code ;) Also, do we want offer and interface that yields results as they arrive? Relates to #175 --- examples/concurrent_actors_primes.py | 119 ++++++++++++++++++++++++++ examples/concurrent_futures_primes.py | 40 +++++++++ 2 files changed, 159 insertions(+) create mode 100644 examples/concurrent_actors_primes.py create mode 100644 examples/concurrent_futures_primes.py diff --git a/examples/concurrent_actors_primes.py b/examples/concurrent_actors_primes.py new file mode 100644 index 000000000..2867fe54e --- /dev/null +++ b/examples/concurrent_actors_primes.py @@ -0,0 +1,119 @@ +""" +Demonstration of the prime number detector example from the +``concurrent.futures`` docs: + +https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example + +This uses no extra threads or fancy semaphores besides ``tractor``'s +(TCP) channels. + +""" +from contextlib import asynccontextmanager +from typing import List, Callable +import itertools +import math +import time + +import tractor +import trio + + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + + +@asynccontextmanager +async def worker_pool(workers=4): + """Though it's a trivial special case for ``tractor``, the well + known "worker pool" seems to be the defacto "I want this process + pattern" for most parallelism pilgrims. + + """ + + async with tractor.open_nursery() as tn: + + portals = [] + results = [] + + for i in range(workers): + + # this starts a new sub-actor (process + trio runtime) and + # stores it's "portal" for later use to "submit jobs" (ugh). + portals.append( + await tn.start_actor( + f'worker_{i}', + rpc_module_paths=[__name__], + ) + ) + + async def map( + worker_func: Callable[[int], bool], + sequence: List[int] + ) -> List[bool]: + + # define an async (local) task to collect results from workers + async def collect_portal_result(func, value, portal): + + results.append((value, await portal.run(func, n=value))) + + async with trio.open_nursery() as n: + + for value, portal in zip(sequence, itertools.cycle(portals)): + + n.start_soon( + collect_portal_result, + worker_func, + value, + portal + ) + + return results + + yield map + + # tear down all "workers" + await tn.cancel() + + +async def main(): + async with worker_pool() as actor_map: + + start = time.time() + # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + for number, prime in await actor_map(is_prime, PRIMES): + print(f'{number} is prime: {prime}') + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + start = time.time() + tractor.run( + main, + loglevel='ERROR', + + # uncomment to use ``multiprocessing`` fork server backend + # which gives a startup time boost at the expense of nested + # processs scalability + # start_method='forkserver') + ) + print(f'script took {time.time() - start} seconds') diff --git a/examples/concurrent_futures_primes.py b/examples/concurrent_futures_primes.py new file mode 100644 index 000000000..81ae23d60 --- /dev/null +++ b/examples/concurrent_futures_primes.py @@ -0,0 +1,40 @@ +import time +import concurrent.futures +import math + +PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + +def is_prime(n): + if n < 2: + return False + if n == 2: + return True + if n % 2 == 0: + return False + + sqrt_n = int(math.floor(math.sqrt(n))) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return False + return True + +def main(): + with concurrent.futures.ProcessPoolExecutor() as executor: + start = time.time() + + for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): + print('%d is prime: %s' % (number, prime)) + + print(f'processing took {time.time() - start} seconds') + +if __name__ == '__main__': + + start = time.time() + main() + print(f'script took {time.time() - start} seconds') From da27d96682d065ca8812466e3d8ae2d9c06327ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 13:30:24 -0500 Subject: [PATCH 02/16] Make new paralellism example space --- examples/{ => parallelism}/concurrent_actors_primes.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename examples/{ => parallelism}/concurrent_actors_primes.py (100%) diff --git a/examples/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py similarity index 100% rename from examples/concurrent_actors_primes.py rename to examples/parallelism/concurrent_actors_primes.py From f715a0cae81ee4fe554509ecd25e336e5c589a85 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 13:34:52 -0500 Subject: [PATCH 03/16] Remove use of tractor.run() --- .../parallelism/concurrent_actors_primes.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 2867fe54e..7df2e43fa 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -50,7 +50,14 @@ async def worker_pool(workers=4): """ - async with tractor.open_nursery() as tn: + async with tractor.open_nursery( + loglevel='ERROR', + + # uncomment to use ``multiprocessing`` fork server backend + # which gives a startup time boost at the expense of nested + # processs scalability + # start_method='forkserver') + ) as tn: portals = [] results = [] @@ -62,7 +69,7 @@ async def worker_pool(workers=4): portals.append( await tn.start_actor( f'worker_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) ) @@ -105,15 +112,9 @@ async def main(): print(f'processing took {time.time() - start} seconds') + if __name__ == '__main__': - start = time.time() - tractor.run( - main, - loglevel='ERROR', - # uncomment to use ``multiprocessing`` fork server backend - # which gives a startup time boost at the expense of nested - # processs scalability - # start_method='forkserver') - ) + start = time.time() + trio.run(main) print(f'script took {time.time() - start} seconds') From a10c4b172a9445428ce4515fb8a9fcc4a84be732 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 13:57:42 -0500 Subject: [PATCH 04/16] Yield results on demand using a mem chan --- .../parallelism/concurrent_actors_primes.py | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 7df2e43fa..874a24f19 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -24,7 +24,8 @@ 112272535095293, 115280095190773, 115797848077099, - 1099726899285419] + 1099726899285419, +] def is_prime(n): @@ -50,17 +51,10 @@ async def worker_pool(workers=4): """ - async with tractor.open_nursery( - loglevel='ERROR', - - # uncomment to use ``multiprocessing`` fork server backend - # which gives a startup time boost at the expense of nested - # processs scalability - # start_method='forkserver') - ) as tn: + async with tractor.open_nursery() as tn: portals = [] - results = [] + snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES)) for i in range(workers): @@ -79,35 +73,37 @@ async def map( ) -> List[bool]: # define an async (local) task to collect results from workers - async def collect_portal_result(func, value, portal): - - results.append((value, await portal.run(func, n=value))) + async def send_result(func, value, portal): + await snd_chan.send((value, await portal.run(func, n=value))) async with trio.open_nursery() as n: for value, portal in zip(sequence, itertools.cycle(portals)): - n.start_soon( - collect_portal_result, + send_result, worker_func, value, portal ) - return results + # deliver results as they arrive + for _ in range(len(sequence)): + yield await recv_chan.receive() yield map - # tear down all "workers" + # tear down all "workers" on pool close await tn.cancel() async def main(): + async with worker_pool() as actor_map: start = time.time() # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - for number, prime in await actor_map(is_prime, PRIMES): + async for number, prime in actor_map(is_prime, PRIMES): + print(f'{number} is prime: {prime}') print(f'processing took {time.time() - start} seconds') From dae154e470a6351c93ae52272cc9aada7427a718 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 15:41:35 -0500 Subject: [PATCH 05/16] More comments --- .../parallelism/concurrent_actors_primes.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 874a24f19..1f5fe484f 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -4,8 +4,8 @@ https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example -This uses no extra threads or fancy semaphores besides ``tractor``'s -(TCP) channels. +This uses no extra threads, fancy semaphores or futures; all we need +is ``tractor``'s channels. """ from contextlib import asynccontextmanager @@ -46,11 +46,12 @@ def is_prime(n): @asynccontextmanager async def worker_pool(workers=4): """Though it's a trivial special case for ``tractor``, the well - known "worker pool" seems to be the defacto "I want this process - pattern" for most parallelism pilgrims. + known "worker pool" seems to be the defacto "but, I want this + process pattern!" for most parallelism pilgrims. + Yes, the workers stay alive (and ready for work) until you close + the context. """ - async with tractor.open_nursery() as tn: portals = [] @@ -67,7 +68,7 @@ async def worker_pool(workers=4): ) ) - async def map( + async def _map( worker_func: Callable[[int], bool], sequence: List[int] ) -> List[bool]: @@ -90,7 +91,8 @@ async def send_result(func, value, portal): for _ in range(len(sequence)): yield await recv_chan.receive() - yield map + # deliver the parallel "worker mapper" to user code + yield _map # tear down all "workers" on pool close await tn.cancel() @@ -110,7 +112,6 @@ async def main(): if __name__ == '__main__': - start = time.time() trio.run(main) print(f'script took {time.time() - start} seconds') From 582eda4afd12e4666fee2591a7bb37a315cff395 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 15:42:05 -0500 Subject: [PATCH 06/16] Add concise readme example --- examples/parallelism/we_are_processes.py | 34 ++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 examples/parallelism/we_are_processes.py diff --git a/examples/parallelism/we_are_processes.py b/examples/parallelism/we_are_processes.py new file mode 100644 index 000000000..ac4b594de --- /dev/null +++ b/examples/parallelism/we_are_processes.py @@ -0,0 +1,34 @@ +""" +Run with a process monitor from a terminal using: +$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py || kill $! + +""" +from multiprocessing import cpu_count +import os + +import tractor +import trio + + +async def target(): + print(f"Yo, i'm {tractor.current_actor().name} " + f"running in pid {os.getpid()}") + await trio.sleep_forever() + + +async def main(): + + async with tractor.open_nursery() as n: + + for i in range(cpu_count()): + await n.run_in_actor(target, name=f'worker_{i}') + + print('This process tree will self-destruct in 1 sec...') + await trio.sleep(1) + + # you could have done this yourself + raise Exception('Self Destructed') + + +if __name__ == '__main__': + trio.run(main) From 47651eaf91545adba6ff0927a53751489ce54e35 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 21:24:43 -0500 Subject: [PATCH 07/16] Contain the error --- examples/parallelism/we_are_processes.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/parallelism/we_are_processes.py b/examples/parallelism/we_are_processes.py index ac4b594de..8283b9c5b 100644 --- a/examples/parallelism/we_are_processes.py +++ b/examples/parallelism/we_are_processes.py @@ -1,6 +1,6 @@ """ Run with a process monitor from a terminal using: -$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py || kill $! +$TERM -e watch -n 0.1 "pstree -a $$" & python examples/parallelism/we_are_processes.py && kill $! """ from multiprocessing import cpu_count @@ -11,7 +11,7 @@ async def target(): - print(f"Yo, i'm {tractor.current_actor().name} " + print(f"Yo, i'm '{tractor.current_actor().name}' " f"running in pid {os.getpid()}") await trio.sleep_forever() @@ -31,4 +31,7 @@ async def main(): if __name__ == '__main__': - trio.run(main) + try: + trio.run(main) + except Exception: + print('Zombies Contained') From 5db737d3688cfc99fbdd132742573f6d91cb7c6e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Jan 2021 21:24:56 -0500 Subject: [PATCH 08/16] Run parallel examples --- tests/test_docs_examples.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index ea676a26b..3778e0e9a 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -78,13 +78,13 @@ def run(script_code): @pytest.mark.parametrize( 'example_script', - [ - f for f in os.listdir(examples_dir()) - if ( - ('__' not in f) and - ('debugging' not in f) - ) - ], + + # walk yields: (dirpath, dirnames, filenames) + [(p[0], f) for p in os.walk(examples_dir()) for f in p[2] + + if '__' not in f + and 'debugging' not in p[0] + ] ) def test_example(run_example_in_subproc, example_script): """Load and run scripts from this repo's ``examples/`` dir as a user @@ -95,7 +95,7 @@ def test_example(run_example_in_subproc, example_script): test directory and invoke the script as a module with ``python -m test_example``. """ - ex_file = os.path.join(examples_dir(), example_script) + ex_file = os.path.join(*example_script) with open(ex_file, 'r') as ex: code = ex.read() From 2ed071c9033cb3a2e04af08231f43b65f1b50673 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 13:14:17 -0500 Subject: [PATCH 09/16] Add `aclosing()` around asyn gen loop --- examples/parallelism/concurrent_actors_primes.py | 8 +++++--- examples/{ => parallelism}/concurrent_futures_primes.py | 0 2 files changed, 5 insertions(+), 3 deletions(-) rename examples/{ => parallelism}/concurrent_futures_primes.py (100%) diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index 1f5fe484f..3ff8dab60 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -16,6 +16,7 @@ import tractor import trio +from async_generator import aclosing PRIMES = [ @@ -103,10 +104,11 @@ async def main(): async with worker_pool() as actor_map: start = time.time() - # for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): - async for number, prime in actor_map(is_prime, PRIMES): - print(f'{number} is prime: {prime}') + async with aclosing(actor_map(is_prime, PRIMES)) as results: + async for number, prime in results: + + print(f'{number} is prime: {prime}') print(f'processing took {time.time() - start} seconds') diff --git a/examples/concurrent_futures_primes.py b/examples/parallelism/concurrent_futures_primes.py similarity index 100% rename from examples/concurrent_futures_primes.py rename to examples/parallelism/concurrent_futures_primes.py From 5da86a0e48a065b4079090eee672a32965edb5ef Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 13:20:24 -0500 Subject: [PATCH 10/16] Ignore type checks on stdlib overrides --- tractor/_forkserver_override.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_forkserver_override.py b/tractor/_forkserver_override.py index 25134ffff..d799bb819 100644 --- a/tractor/_forkserver_override.py +++ b/tractor/_forkserver_override.py @@ -6,6 +6,8 @@ .. note:: There is no type hinting in this code base (yet) to remain as a close as possible to upstream. """ +# type: ignore + import os import socket import signal From ce6123081554461bb6122128f65c41507807fe29 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 13:37:48 -0500 Subject: [PATCH 11/16] Fix more stdlib typing issues with latest mypy --- tractor/_mp_fixup_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_mp_fixup_main.py b/tractor/_mp_fixup_main.py index 7869561be..78d3e9f03 100644 --- a/tractor/_mp_fixup_main.py +++ b/tractor/_mp_fixup_main.py @@ -67,7 +67,7 @@ def _fixup_main_from_name(mod_name: str) -> None: main_module = types.ModuleType("__mp_main__") main_content = runpy.run_module(mod_name, run_name="__mp_main__", - alter_sys=True) + alter_sys=True) # type: ignore main_module.__dict__.update(main_content) sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module @@ -95,6 +95,6 @@ def _fixup_main_from_path(main_path: str) -> None: # old_main_modules.append(current_main) main_module = types.ModuleType("__mp_main__") main_content = runpy.run_path(main_path, - run_name="__mp_main__") + run_name="__mp_main__") # type: ignore main_module.__dict__.update(main_content) sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module From 7f8c5cdfe6279f89997552c95a7c20021515f3f8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 14:54:46 -0500 Subject: [PATCH 12/16] Add an actor "state mutation" via messages example --- examples/actors/mutate_remote_state.py | 51 ++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 examples/actors/mutate_remote_state.py diff --git a/examples/actors/mutate_remote_state.py b/examples/actors/mutate_remote_state.py new file mode 100644 index 000000000..1d6554c5c --- /dev/null +++ b/examples/actors/mutate_remote_state.py @@ -0,0 +1,51 @@ +from itertools import cycle +from pprint import pformat + +import trio +import tractor + +_snd_chan, _recv_chan = trio.open_memory_channel(100) +_actor_state = {'some_state_stuff': None} + + +async def update_local_state(msg: dict): + + global _actor_state + actor = tractor.current_actor() + + print(f'Yo we got a message {msg}') + + # update the "actor state" + _actor_state.update(msg) + + print(f'New local "state" for {actor.uid} is {pformat(_actor_state)}') + + # we're done so exit this task running in the subactor + + +async def main(): + # Main process/thread that spawns one sub-actor and sends messages + # to it to update it's state. + + actor_portals = [] + + # XXX: that subactor can **not** outlive it's parent, this is SC. + async with tractor.open_nursery() as tn: + + portal = await tn.start_actor('even_boy', enable_modules=[__name__]) + actor_portals.append(portal) + + portal = await tn.start_actor('odd_boy', enable_modules=[__name__]) + actor_portals.append(portal) + + for i, (count, portal) in enumerate( + zip(range(100), cycle(actor_portals)) + ): + await portal.run(update_local_state, msg={f'msg_{i}': count}) + + # blocks here indefinitely synce we spawned "daemon actors using + # .start_actor()`, you'll need to control-c to cancel. + + +if __name__ == '__main__': + trio.run(main) From 47d7b603db2f07f43826a0c856432518f087d09e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 15:18:52 -0500 Subject: [PATCH 13/16] Use a global dataclass instead, cuz we like "objects"? --- examples/actors/mutate_remote_state.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/examples/actors/mutate_remote_state.py b/examples/actors/mutate_remote_state.py index 1d6554c5c..26ab8645e 100644 --- a/examples/actors/mutate_remote_state.py +++ b/examples/actors/mutate_remote_state.py @@ -1,24 +1,37 @@ from itertools import cycle from pprint import pformat +from dataclasses import dataclass, field import trio import tractor -_snd_chan, _recv_chan = trio.open_memory_channel(100) -_actor_state = {'some_state_stuff': None} + +@dataclass +class MyProcessStateThing: + state: dict = field(default_factory=dict) + + def update(self, msg: dict): + self.state.update(msg) + + +_actor_state = MyProcessStateThing() async def update_local_state(msg: dict): + """Update process-local state from sent message and exit. - global _actor_state + """ actor = tractor.current_actor() + global _actor_state + + print(f'Yo we got a message {msg}') # update the "actor state" _actor_state.update(msg) - print(f'New local "state" for {actor.uid} is {pformat(_actor_state)}') + print(f'New local "state" for {actor.uid} is {pformat(_actor_state.state)}') # we're done so exit this task running in the subactor @@ -43,7 +56,7 @@ async def main(): ): await portal.run(update_local_state, msg={f'msg_{i}': count}) - # blocks here indefinitely synce we spawned "daemon actors using + # blocks here indefinitely synce we spawned "daemon actors" using # .start_actor()`, you'll need to control-c to cancel. From 70c7e098314872451382929d0ff6a17b5fa8a0ca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Jan 2021 20:41:03 -0500 Subject: [PATCH 14/16] Add class style "actors" example with client proxy API --- examples/actors/ray_style_classes.py | 153 +++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 examples/actors/ray_style_classes.py diff --git a/examples/actors/ray_style_classes.py b/examples/actors/ray_style_classes.py new file mode 100644 index 000000000..e304d9307 --- /dev/null +++ b/examples/actors/ray_style_classes.py @@ -0,0 +1,153 @@ +import inspect +from typing import Any +from functools import partial +from contextlib import asynccontextmanager, AsyncExitStack +from itertools import cycle +from pprint import pformat + +import trio +import tractor + + +log = tractor.log.get_logger(__name__) + + +class ActorState: + """Singlteton actor per process. + + """ + # this is a class defined variable and is thus both + # singleton across object instances and task safe. + state: dict = {} + + def update(self, msg: dict) -> None: + _actor = tractor.current_actor() + + print(f'Yo we got a message {msg}') + self.state.update(msg) + + print(f'New local "state" for {_actor.uid} is {pformat(self.state)}') + + def close(self): + # gives headers showing which process and task is active + log.info('Actor state is closing') + + # if we wanted to support spawning or talking to other + # actors we can do that using a portal map collection? + # _portals: dict = {} + + +async def _run_proxy_method( + meth: str, + msg: dict, +) -> Any: + """Update process-local state from sent message and exit. + + """ + # Create a new actor instance per call. + # We can make this persistent by storing it either + # in a global var or are another clas scoped variable? + # If you want it somehow persisted in another namespace + # I'd be interested to know "where". + actor = ActorState() + if meth != 'close': + return getattr(actor, meth)(msg) + else: + actor.close() + + # we're done so exit this task running in the subactor + + +class MethodProxy: + def __init__( + self, + portal: tractor._portal.Portal + ) -> None: + self._portal = portal + + async def _run_method( + self, + *, + meth: str, + msg: dict, + ) -> Any: + return await self._portal.run( + _run_proxy_method, + meth=meth, + msg=msg + ) + + +def get_method_proxy(portal, target=ActorState) -> MethodProxy: + + proxy = MethodProxy(portal) + + # mock all remote methods + for name, method in inspect.getmembers( + target, predicate=inspect.isfunction + ): + if '_' == name[0]: + # skip private methods + continue + + else: + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + return proxy + + +@asynccontextmanager +async def spawn_proxy_actor(name): + + # XXX: that subactor can **not** outlive it's parent, this is SC. + async with tractor.open_nursery( + debug_mode=True, + # loglevel='info', + ) as tn: + + portal = await tn.start_actor(name, enable_modules=[__name__]) + + proxy = get_method_proxy(portal) + + yield proxy + + await proxy.close(msg=None) + + +async def main(): + # Main process/thread that spawns one sub-actor and sends messages + # to it to update it's state. + + try: + stack = AsyncExitStack() + + actors = [] + for name in ['even', 'odd']: + + actor_proxy = await stack.enter_async_context( + spawn_proxy_actor(name + '_boy') + ) + actors.append(actor_proxy) + + # spin through the actors and update their states + for i, (count, actor) in enumerate( + zip(range(100), cycle(actors)) + ): + # Here we call the locally patched `.update()` method of the + # remote instance + + # NOTE: the instance created each call here is currently + # a new object - to persist it across `portal.run()` calls + # we need to store it somewhere in memory for access by + # a new task spawned in the remote actor process. + await actor.update(msg={f'msg_{i}': count}) + + # blocks here indefinitely synce we spawned "daemon actors" using + # .start_actor()`, you'll need to control-c to cancel. + + finally: + await stack.aclose() + + +if __name__ == '__main__': + trio.run(main) From 4a4a786763178b94676a5984eea84008de6a1a7c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Jan 2021 14:40:55 -0500 Subject: [PATCH 15/16] Add a super basic supervisor/restart example --- examples/actors/most_basic_supervisor.py | 86 ++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 examples/actors/most_basic_supervisor.py diff --git a/examples/actors/most_basic_supervisor.py b/examples/actors/most_basic_supervisor.py new file mode 100644 index 000000000..45dad3450 --- /dev/null +++ b/examples/actors/most_basic_supervisor.py @@ -0,0 +1,86 @@ +import trio +import tractor + + +class Restart(Exception): + """Restart signal""" + + +async def sleep_then_restart(): + actor = tractor.current_actor() + print(f'{actor.uid} starting up!') + await trio.sleep(0.5) + raise Restart('This is a restart signal') + + +async def signal_restart_whole_actor(): + actor = tractor.current_actor() + print(f'{actor.uid} starting up!') + await trio.sleep(0.5) + return 'restart_me' + + +async def respawn_remote_task(portal): + # start a task in the actor at the other end + # of the provided portal, when it signals a restart, + # restart it.. + + # This is much more efficient then restarting the undlerying + # process over and over since the python interpreter runtime + # stays up and we just submit a new task to run (which + # is just the original one we submitted repeatedly. + while True: + try: + await portal.run(sleep_then_restart) + except tractor.RemoteActorError as error: + if 'Restart' in str(error): + # respawn the actor task + continue + + +async def supervisor(): + + async with tractor.open_nursery() as tn: + + p0 = await tn.start_actor('task_restarter', enable_modules=[__name__]) + + # Yes, you can do this from multiple tasks on one actor + # or mulitple lone tasks in multiple subactors. + # We'll show both. + + async with trio.open_nursery() as n: + # we'll doe the first as a lone task restart in a daemon actor + for i in range(4): + n.start_soon(respawn_remote_task, p0) + + # Open another nursery that will respawn sub-actors + + # spawn a set of subactors that will signal restart + # of the group of processes on each failures + portals = [] + + # start initial subactor set + for i in range(4): + p = await tn.run_in_actor(signal_restart_whole_actor) + portals.append(p) + + # now wait on results and respawn actors + # that request it + while True: + + for p in portals: + result = await p.result() + + if result == 'restart_me': + print(f'restarting {p.channel.uid}') + await p.cancel_actor() + await trio.sleep(0.5) + p = await tn.run_in_actor(signal_restart_whole_actor) + portals.append(p) + + # this will block indefinitely so user must + # cancel with ctrl-c + + +if __name__ == '__main__': + trio.run(supervisor) From f1acbd9b84338878df241500d003d17440213370 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Jan 2021 14:41:17 -0500 Subject: [PATCH 16/16] Stash the type string from remote errors --- tractor/_exceptions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 63e0d0948..bebaf6662 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -17,6 +17,8 @@ class RemoteActorError(Exception): "Remote actor exception bundled locally" def __init__(self, message, type_str, **msgdata) -> None: super().__init__(message) + self.type_str = type_str + for ns in [builtins, _this_mod, trio]: try: self.type = getattr(ns, type_str)