Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/core-gpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ jobs:
- name: Run GPU CI
shell: bash
run: |
coverage run -m pytest ./ -m cuda --cov-config=setup.cfg --cov-report= --cov=mars
coverage xml
pytest ./ -m cuda --cov-config=setup.cfg --cov-report=xml --cov=mars

- name: Report coverage data
shell: bash
run: |
bash <(curl -s https://codecov.io/bash)
rm -rf *.coverage*
rm -rf coverage.xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
fail_ci_if_error: true
flags: unittests
name: codecov-gpu
verbose: true
1 change: 0 additions & 1 deletion mars/lib/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import asyncio
import contextlib
import sys

from .file import AioFileObject, AioFilesystem
Expand Down
10 changes: 9 additions & 1 deletion mars/oscar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@
setup_cluster,
wait_actor_pool_recovered,
get_pool_config,
copyto_via_buffers,
copyto_via_file_objects,
buffer_ref,
file_object_ref,
)
from .backends import allocate_strategy
from .backends.pool import MainActorPoolType
from .batch import extensible
from .core import ActorRef
from .core import (
ActorRef,
BufferRef,
FileObjectRef,
) # noqa: F401 # pylint: disable=unused-import
from .debug import set_debug_options, get_debug_options, DebugOptions
from .errors import (
ActorNotExist,
Expand Down
32 changes: 29 additions & 3 deletions mars/oscar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
# limitations under the License.

from urllib.parse import urlparse
from typing import Any, Dict, Type, Tuple
from typing import Any, Dict, Type, Tuple, List
from numbers import Number
from collections import defaultdict

from ..lib.aio import AioFileObject
from .backend import get_backend
from .context import get_context
from .core import _Actor, _StatelessActor, ActorRef
from .core import _Actor, _StatelessActor, ActorRef, BufferRef, FileObjectRef


async def create_actor(actor_cls, *args, uid=None, address=None, **kwargs) -> ActorRef:
Expand All @@ -42,11 +43,21 @@ async def actor_ref(*args, **kwargs) -> ActorRef:
return await ctx.actor_ref(*args, **kwargs)


async def kill_actor(actor_ref):
async def kill_actor(actor_ref: ActorRef):
ctx = get_context()
return await ctx.kill_actor(actor_ref)


def buffer_ref(address: str, buffer: Any) -> BufferRef:
ctx = get_context()
return ctx.buffer_ref(address, buffer)


def file_object_ref(address: str, fileobj: AioFileObject) -> FileObjectRef:
ctx = get_context()
return ctx.file_object_ref(address, fileobj)


async def create_actor_pool(address: str, n_process: int = None, **kwargs):
if address is None:
raise ValueError("address has to be provided")
Expand All @@ -70,6 +81,21 @@ async def get_pool_config(address: str):
return await ctx.get_pool_config(address)


async def copyto_via_buffers(local_buffers: list, remote_buffer_refs: List[BufferRef]):
ctx = get_context()
return await ctx.copyto_via_buffers(local_buffers, remote_buffer_refs)


async def copyto_via_file_objects(
local_file_objects: List[AioFileObject],
remote_file_object_refs: List[FileObjectRef],
):
ctx = get_context()
return await ctx.copyto_via_file_objects(
local_file_objects, remote_file_object_refs
)


def setup_cluster(address_to_resources: Dict[str, Dict[str, Number]]):
scheme_to_address_resources = defaultdict(dict)
for address, resources in address_to_resources.items():
Expand Down
4 changes: 2 additions & 2 deletions mars/oscar/backends/communication/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,11 @@ def parse_config(cls, config: dict) -> dict:
return dict()

@implements(Channel.send)
async def send(self, message):
async def send(self, message: Any):
return await self.channel.send(message)

@implements(Channel.recv)
async def recv(self):
async def recv(self) -> Any:
return await self.channel.recv()

async def close(self):
Expand Down
1 change: 1 addition & 0 deletions mars/oscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async def stop(self):
await asyncio.gather(
*(channel.close() for channel in self._channels if not channel.closed)
)
self._channels = []

@property
@implements(Server.stopped)
Expand Down
31 changes: 30 additions & 1 deletion mars/oscar/backends/communication/tests/test_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pytest

from .....lib.aio import AioEvent
from .....tests.core import require_cudf, require_cupy
from .....tests.core import require_cudf, require_cupy, require_ucx
from .....utils import get_next_port, lazy_import
from .. import (
Channel,
Expand All @@ -37,6 +37,7 @@
DummyClient,
Server,
UCXServer,
UCXChannel,
)
from ..ucx import UCXInitializer

Expand Down Expand Up @@ -222,6 +223,34 @@ async def test_multiprocess_cuda_comm(server_type):
await client.close()


@require_ucx
@pytest.mark.asyncio
async def test_ucx_channel():
size = 2**5

async def handle_channel(channel: UCXChannel):
buffer = np.empty(size, dtype="u1")
await channel.recv_buffers([buffer])
await channel.send_buffers([buffer])

# create server
addr = f"127.0.0.1:{get_next_port()}"
server = await UCXServer.create({"address": addr, "handle_channel": handle_channel})
await server.start()
assert isinstance(server.info, dict)

# create client
client = await UCXServer.client_type.connect(addr)
buf = np.zeros(size, dtype="u1")
buf += 1
await client.send_buffers([buf])
new_buf = np.empty_like(buf)
await client.recv_buffers([new_buf])
np.testing.assert_array_equal(buf, new_buf)

await server.stop()


def test_get_client_type():
assert issubclass(get_client_type("127.0.0.1"), SocketClient)
assert issubclass(get_client_type("unixsocket:///1"), UnixSocketClient)
Expand Down
76 changes: 52 additions & 24 deletions mars/oscar/backends/communication/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import cloudpickle
import numpy as np

from ....utils import lazy_import, implements, classproperty
from ....utils import lazy_import, implements, classproperty, is_cuda_buffer
from ....lib.nvutils import get_index_and_uuid, get_cuda_context
from ....serialization import deserialize
from ....serialization.aio import AioSerializer, get_header_length, BUFFER_SIZES_NAME
Expand Down Expand Up @@ -172,9 +172,7 @@ def init(ucx_config: dict):
new_environ.update(envs)
os.environ = new_environ
try:
ucp.init(
options=options, env_takes_precedence=True, blocking_progress_mode=False
)
ucp.init(options=options, env_takes_precedence=True)
finally:
os.environ = original_environ

Expand Down Expand Up @@ -246,23 +244,7 @@ async def send(self, message: Any):
compress = self.compression or 0
serializer = AioSerializer(message, compress=compress)
buffers = await serializer.run()
try:
# It is necessary to first synchronize the default stream before start
# sending We synchronize the default stream because UCX is not
# stream-ordered and syncing the default stream will wait for other
# non-blocking CUDA streams. Note this is only sufficient if the memory
# being sent is not currently in use on non-blocking CUDA streams.
if any(hasattr(buf, "__cuda_array_interface__") for buf in buffers):
# has GPU buffer
synchronize_stream(0)

async with self._send_lock:
for buffer in buffers:
if buffer.nbytes if hasattr(buffer, "nbytes") else len(buffer) > 0:
await self.ucp_endpoint.send(buffer)
except ucp.exceptions.UCXBaseException: # pragma: no cover
self.abort()
raise ChannelClosed("While writing, the connection was closed")
return await self.send_buffers(buffers)

@implements(Channel.recv)
async def recv(self):
Expand Down Expand Up @@ -302,6 +284,41 @@ async def recv(self):
raise EOFError("Server closed already")
return deserialize(header, buffers)

async def send_buffers(self, buffers: list):
try:
# It is necessary to first synchronize the default stream before start
# sending We synchronize the default stream because UCX is not
# stream-ordered and syncing the default stream will wait for other
# non-blocking CUDA streams. Note this is only sufficient if the memory
# being sent is not currently in use on non-blocking CUDA streams.
if any(is_cuda_buffer(buf) for buf in buffers):
# has GPU buffer
synchronize_stream(0)

async with self._send_lock:
for buffer in buffers:
if buffer.nbytes if hasattr(buffer, "nbytes") else len(buffer) > 0:
await self.ucp_endpoint.send(buffer)
except ucp.exceptions.UCXBaseException: # pragma: no cover
self.abort()
raise ChannelClosed("While writing, the connection was closed")

async def recv_buffers(self, buffers: list):
async with self._recv_lock:
try:
for buffer in buffers:
await self.ucp_endpoint.recv(buffer)
except BaseException as e: # pragma: no cover
if not self._closed:
# In addition to UCX exceptions, may be CancelledError or another
# "low-level" exception. The only safe thing to do is to abort.
self.abort()
raise ChannelClosed(
f"Connection closed by writer.\nInner exception: {e!r}"
) from e
else:
raise EOFError("Server closed already")

def abort(self):
self._closed = True
if self.ucp_endpoint is not None:
Expand Down Expand Up @@ -386,7 +403,7 @@ async def serve_forever(client_ucp_endpoint: "ucp.Endpoint"):
client_ucp_endpoint, local_address=server.address
)
except ChannelClosed: # pragma: no cover
logger.debug("Connection closed before handshake completed")
logger.exception("Connection closed before handshake completed")
return

ucp_listener = ucp.create_listener(serve_forever, port=port)
Expand Down Expand Up @@ -438,6 +455,7 @@ async def stop(self):
await asyncio.gather(
*(channel.close() for channel in self._channels if not channel.closed)
)
self._channels = []
self._ucp_listener = None
self._closed.set()

Expand All @@ -452,6 +470,7 @@ class UCXClient(Client):
__slots__ = ()

scheme = UCXServer.scheme
channel: UCXChannel

@classmethod
def parse_config(cls, config: dict) -> dict:
Expand All @@ -473,9 +492,18 @@ async def connect(

try:
ucp_endpoint = await ucp.create_endpoint(host, port)
except ucp.exceptions.UCXBaseException: # pragma: no cover
raise ChannelClosed("Connection closed before handshake completed")
except ucp.exceptions.UCXBaseException as e: # pragma: no cover
raise ChannelClosed(
f"Connection closed before handshake completed, "
f"local address: {local_address}, dest address: {dest_address}"
) from e
channel = UCXChannel(
ucp_endpoint, local_address=local_address, dest_address=dest_address
)
return UCXClient(local_address, dest_address, channel)

async def send_buffers(self, buffers: list):
return await self.channel.send_buffers(buffers)

async def recv_buffers(self, buffers: list):
return await self.channel.recv_buffers(buffers)
21 changes: 4 additions & 17 deletions mars/oscar/backends/communication/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import numpy as np

from ....serialization.aio import BUFFER_SIZES_NAME
from ....utils import lazy_import
from ....utils import lazy_import, convert_to_cupy_ndarray, is_cuda_buffer

cupy = lazy_import("cupy")
cudf = lazy_import("cudf")
Expand All @@ -27,23 +27,10 @@
CUDA_CHUNK_SIZE = 16 * 1024**2


def _convert_to_cupy_ndarray(
cuda_buffer: Union["cupy.ndarray", "rmm.DeviceBuffer"]
) -> "cupy.ndarray":
if isinstance(cuda_buffer, cupy.ndarray):
return cuda_buffer

size = cuda_buffer.nbytes
data = cuda_buffer.__cuda_array_interface__["data"][0]
memory = cupy.cuda.UnownedMemory(data, size, cuda_buffer)
ptr = cupy.cuda.MemoryPointer(memory, 0)
return cupy.ndarray(shape=size, dtype="u1", memptr=ptr)


def write_buffers(writer: StreamWriter, buffers: List):
def _write_cuda_buffer(cuda_buffer: Union["cupy.ndarray", "rmm.DeviceBuffer"]):
# convert cuda buffer to cupy ndarray
cuda_buffer = _convert_to_cupy_ndarray(cuda_buffer)
cuda_buffer = convert_to_cupy_ndarray(cuda_buffer)

chunk_size = CUDA_CHUNK_SIZE
offset = 0
Expand All @@ -58,7 +45,7 @@ def _write_cuda_buffer(cuda_buffer: Union["cupy.ndarray", "rmm.DeviceBuffer"]):
offset += size

for buffer in buffers:
if hasattr(buffer, "__cuda_array_interface__"):
if is_cuda_buffer(buffer):
# GPU buffer
_write_cuda_buffer(buffer)
else:
Expand All @@ -77,7 +64,7 @@ async def read_buffers(header: Dict, reader: StreamReader):
buffers.append(content)
else:
buffer = rmm.DeviceBuffer(size=buf_size)
arr = _convert_to_cupy_ndarray(buffer)
arr = convert_to_cupy_ndarray(buffer)
offset = 0
chunk_size = CUDA_CHUNK_SIZE
while offset < buf_size:
Expand Down
4 changes: 3 additions & 1 deletion mars/oscar/backends/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
ControlMessageType,
)
from .router import Router
from .transfer import TransferClient


@dataslots
Expand All @@ -49,12 +50,13 @@ class ProfilingContext:
task_id: str


class MarsActorContext(BaseActorContext):
class MarsActorContext(TransferClient, BaseActorContext):
__slots__ = ("_caller",)

support_allocate_strategy = True

def __init__(self, address: str = None):
TransferClient.__init__(self)
BaseActorContext.__init__(self, address)
self._caller = ActorCaller()

Expand Down
Loading