POC: Tempting to introduce peer-to-peer chunk data exchange#728
POC: Tempting to introduce peer-to-peer chunk data exchange#728jqdai wants to merge 22 commits intoxorbitsai:mainfrom
Conversation
| # ) | ||
| # self.result.status = SubtaskStatus.errored | ||
| # raise | ||
| runner_storage: RunnerStorageActor = await mo.create_actor( |
There was a problem hiding this comment.
Why need this creation? The RunnerStorageActor is already created by SubtaskRunnerManagerActor.
There was a problem hiding this comment.
There used to be a 'RunnerStorage not found' error during debugging. Now the implementation is changed to raising an exception when runner storage is not found. Thank you.
| if puts: | ||
| try: | ||
| runner_storage: RunnerStorageActor = await mo.actor_ref( | ||
| uid=RunnerStorageActor.gen_uid(self._band[1], self._slot_id), |
There was a problem hiding this comment.
Just store the data to current RunnerStorageActor. It's OK to get the actor ref and call put_data API. The in-process actor communication is optimized to a function call automatically.
There was a problem hiding this comment.
Here we use mo.actor_ref to get the runner storage of the current subtask runner because RunnerStorage is currently not a property of SubtaskProcessor class or SubtaskRunnerActor class. Thus, the processors of a runner may not have direct access to the runner storage of the runner. I wonder if there's a better or more direct implementation.
There was a problem hiding this comment.
Additionally, sometimes I'm confused whether the address of runner storage should be set to the supervisor_address of the processor or the address stored in band (band[0], specifically). Current implementation takes the former but I'm not quite sure about it. SubtaskRunnerActor uses self.address which is a property not seen in its construction function.
| data_key_to_memory_size[store_key] = store_info.memory_size | ||
| data_key_to_object_id[store_key] = store_info.object_id | ||
| data_key_to_band[store_key] = self._band | ||
| data_key_to_slot_id[store_key] = self._slot_id |
There was a problem hiding this comment.
The slot id is not stored to meta?
There was a problem hiding this comment.
In line 562 of python/xorbits/_mars/services/subtask/worker/processor.py, in _store_meta(), we directly added current band and slot_id (self._band and self._slot_id, specifically). Thus we did not modify the original _stor_mapper_data of SubtaskProcessor to store mappings from data_key to band or slot_id. Is this still necessary?
There was a problem hiding this comment.
Meanwhile in file python/xorbits/_mars/services/storage/handler.py#L639, StorageHandler calls modified meta_api.add_chunk_bands, which needs [slot_id] as additional input, which is not provided by StorageHandlerActor itself. This is still unsolved.
What do these changes do?
This is a proof-of-concept draft pull request.
In Xorbits, storage is used to store intermediate data and final results during the computation process, supporting various types of storage such as GPU memory, main memory, and disk. Currently in the Xorbits, data produced by workers is stored and managed by a cetralized
storage_api.This project hopes to introduce peer-to-peer data storage and communication, where each Xorbits worker hold their own data locally. A
meta_apimaintains the keys of data and the address of the worker that produced this data. Each subtask runner holds an independentRunnerStorageto maintain all data created in this runner and respond to requests for data (if it has). When a runner needs a non-local data, it looks up themeta_apiand finds the address of the runner that holds the data, and then fetches the data. Thus, a centralized data storage is no longer necessary, which may bring potential speed accelerance.Check code requirements