diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..b1bc858 Binary files /dev/null and b/.DS_Store differ diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..749a509 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,43 @@ +name: Deploy MyST Site +on: + push: + branches: [main] + workflow_dispatch: + +permissions: + contents: read + pages: write + id-token: write + +concurrency: + group: "pages" + cancel-in-progress: false + +jobs: + deploy: + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: '20' + - name: Setup MyST + run: npm install -g mystmd + - name: Build site + env: + BASE_URL: /asyncroscopy + run: | + cd docs + myst build --html + touch _build/html/.nojekyll + - name: Upload artifact + uses: actions/upload-pages-artifact@v3 + with: + path: 'docs/_build/html' + - name: Deploy to GitHub Pages + id: deployment + uses: actions/deploy-pages@v4 diff --git a/.gitignore b/.gitignore index 9f4f845..7f3db4f 100644 --- a/.gitignore +++ b/.gitignore @@ -206,5 +206,9 @@ marimo/_static/ marimo/_lsp/ __marimo__/ +.DS_Store + PythonPackages-AS-1.15/ -llm-context/ \ No newline at end of file +llm-context/ +# MyST build outputs +_build diff --git a/README.md b/README.md index b30e6b8..1c8882c 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ See - docs/dev_guide.md ### Core installation (simulation mode) ```bash -pip install . +pip install --find-links ./stubs -e . ``` or with `uv`: diff --git a/asyncroscopy/ThermoDigitalTwin.py b/asyncroscopy/ThermoDigitalTwin.py index ed76b99..3faaf9f 100644 --- a/asyncroscopy/ThermoDigitalTwin.py +++ b/asyncroscopy/ThermoDigitalTwin.py @@ -4,41 +4,51 @@ Useful for testing and development without requiring AutoScript hardware. """ - import json -import time -import math -from typing import Optional import numpy as np -import pyTEMlib.probe_tools as pt import pyTEMlib.image_tools as it -from ase.io import read +import pyTEMlib.probe_tools as pt +import tango from ase import Atoms from ase.build import bulk - -import numpy as np -import tango -from tango import AttrWriteType, DevEncoded, DevState -from tango.server import Device, attribute, command, device_property +from tango import AttrWriteType, DevState +from tango.server import Device, attribute, device_property from asyncroscopy.Microscope import Microscope + class ThermoDigitalTwin(Microscope): """ - Detector-specific settings (dwell time, resolution) are stored in - dedicated detector devices and read via DeviceProxy at acquisition time. + Persistent ASE-backed sample simulation with stage-coupled viewport rendering. """ - # ------------------------------------------------------------------ - # Device properties — configure in Tango DB per deployment - # ------------------------------------------------------------------ - + sample_seed = device_property( + dtype=int, + default_value=12345, + doc="Seed used to generate deterministic sample geometry.", + ) + sample_particle_count = device_property( + dtype=int, + default_value=40, + doc="Number of particles in the generated sample.", + ) + sample_size_xy = device_property( + dtype=float, + default_value=6e-9, + doc="Absolute sample XY size in meters (must be > 0).", + ) + sample_size_z = device_property( + dtype=float, + default_value=6e-9, + doc="Absolute sample thickness (Z) in meters (must be > 0).", + ) + stage_move_noise_std = device_property( + dtype=float, + default_value=0.0, + doc="Gaussian move noise standard deviation in meters (applied to x,y,z).", + ) - # ------------------------------------------------------------------ - # Attributes - # ------------------------------------------------------------------ - # not finishded manufacturer = attribute( label="ThermoDigitalTwin", dtype=str, @@ -47,8 +57,8 @@ class ThermoDigitalTwin(Microscope): beam_pos = attribute( label="Beam Position", - dtype=(float,), # 1D array of floats - max_dim_x=2, # exactly 2 elements: [x, y] + dtype=(float,), + max_dim_x=2, access=AttrWriteType.READ_WRITE, unit="fractional", min_value=0.0, @@ -56,45 +66,52 @@ class ThermoDigitalTwin(Microscope): doc="Beam position as [x, y] fractional coordinates, each in range [0.0, 1.0]", ) - # ------------------------------------------------------------------ - # Initialisation - # ------------------------------------------------------------------ - def init_device(self) -> None: + """Initialize the Tango device and simulation state variables.""" Device.init_device(self) self.set_state(DevState.INIT) - - # Internal state + self._stem_mode = True - self._detector_proxies = {} + self._detector_proxies: dict[str, tango.DeviceProxy] = {} self._manufacturer = "UTKTeam" self._beam_pos_x = 0.5 self._beam_pos_y = 0.5 - self._particle_records = [] self._imsize = 512 - self._fov = 200e-10 # meters, i.e. 200 angstroms - self._stage_position = np.random.rand(3) * 1e-6 # random initial stage position in meters - + self._fov = 200e-10 # meters + self._stage_position = np.zeros(5, dtype=np.float64) # x, y, z, alpha, beta + + self._sample_atoms_base = Atoms() + self._sample_atoms_view = Atoms() + self._particle_records_base: list[dict] = [] + self._particle_records_view: list[dict] = [] + self._world_bounds_ang = { + "x_min": -1.0, + "x_max": 1.0, + "y_min": -1.0, + "y_max": 1.0, + "z_min": -1.0, + "z_max": 1.0, + } + self._cached_pose_key: tuple | None = None + self._all_sample_elements: list[str] = [] + self._connect() - + def _connect(self): """Simulate connection by connecting to detector proxies.""" self._connect_detector_proxies() + self._generate_sample(seed=int(self.sample_seed)) self.set_state(DevState.ON) - def _connect_detector_proxies(self) -> None: """Build DeviceProxy objects for each configured detector device.""" - # Extend this dict as more detectors are added - # later, we want to do this automatically, not with a dictionary. addresses: dict[str, str] = { - "eds": self.eds_device_address, + "eds": self.eds_device_address, "stage": self.stage_device_address, "scan": self.scan_device_address, } - print(addresses) for name, address in addresses.items(): - if not address: # <-- minimal fix + if not address: self.info_stream(f"Skipping {name}: no address configured") continue try: @@ -103,307 +120,472 @@ def _connect_detector_proxies(self) -> None: except tango.DevFailed as e: self.error_stream(f"Failed to connect to {name} proxy at {address}: {e}") - - # ------------------------------------------------------------------ - # Attribute read methods - # ------------------------------------------------------------------ - - def read_manufacturer(self) -> bool: - # TODO: query self._microscope.optics.mode when AutoScript available - return self._manufacturer - - - def read_beam_pos(self): - """Return beam position as [x, y] fractional coordinates.""" - return [self._beam_pos_x, self._beam_pos_y] - - # --- Write Method --- - - def write_beam_pos(self, value): - """Set beam position from [x, y] fractional coordinates.""" - x, y = value[0], value[1] - - if not (0.0 <= x <= 1.0 and 0.0 <= y <= 1.0): + def _sync_stage_from_proxy(self) -> None: + """Fetch the current stage position from the stage device proxy.""" + stage = self._detector_proxies.get("stage") + if stage is None: + return + try: + self._stage_position = np.array( + [stage.x, stage.y, stage.z, stage.alpha, stage.beta], + dtype=np.float64, + ) + self._update_view_cache(force=False) + except tango.DevFailed: + self.error_stream("Failed to read stage proxy position; using internal stage state.") + + def _update_view_cache(self, force: bool = False) -> None: + """Update the viewed sample positions by applying current stage rotations and translations.""" + pose_key = tuple(np.round(self._stage_position, 12)) + if not force and pose_key == self._cached_pose_key: + return + + alpha_deg, beta_deg = self._stage_position[3], self._stage_position[4] + stage_xyz_ang = self._stage_position[:3] * 1e10 + + self._sample_atoms_view = self._sample_atoms_base.copy() + if len(self._sample_atoms_view) > 0: + if alpha_deg != 0.0: + self._sample_atoms_view.rotate(alpha_deg, 'x', center=(0, 0, 0)) + if beta_deg != 0.0: + self._sample_atoms_view.rotate(beta_deg, 'y', center=(0, 0, 0)) + self._sample_atoms_view.translate(-stage_xyz_ang) + + self._particle_records_view = [] + for rec in self._particle_records_base: + dummy = Atoms("H", positions=[rec["center"]]) + if alpha_deg != 0.0: + dummy.rotate(alpha_deg, 'x', center=(0, 0, 0)) + if beta_deg != 0.0: + dummy.rotate(beta_deg, 'y', center=(0, 0, 0)) + dummy.translate(-stage_xyz_ang) + + self._particle_records_view.append( + { + "center": dummy.positions[0], + "radius": rec["radius"], + "btype": rec["btype"], + "composition": rec["composition"], + } + ) + self._cached_pose_key = pose_key + + @staticmethod + def _sub_pix_gaussian(size: int = 11, sigma: float = 0.8, dx: float = 0.0, dy: float = 0.0) -> np.ndarray: + """Generate a 2D Gaussian kernel with sub-pixel shifts for atomic rendering.""" + coords = np.arange(size) - (size - 1) / 2.0 + xx, yy = np.meshgrid(coords, coords) + g = np.exp(-(((xx + dx) ** 2 + (yy + dy) ** 2) / (2 * sigma**2))) + m = np.max(g) + return g / m if m > 0 else g + + def _create_pseudo_potential( + self, + xtal: Atoms, + pixel_size: float, + sigma: float, + bounds: tuple[float, float, float, float], + atom_frame: int = 11, + ) -> np.ndarray: + """Project 3D atomic coordinates into a 2D density map representing pseudo-potential.""" + x_min, x_max, y_min, y_max = bounds + pixels_x = int(np.round((x_max - x_min) / pixel_size)) + pixels_y = int(np.round((y_max - y_min) / pixel_size)) + potential_map = np.zeros((pixels_x, pixels_y), dtype=np.float32) + padding = atom_frame + padded = np.pad(potential_map, padding, mode="constant", constant_values=0.0) + + if len(xtal) == 0: + return potential_map + + atomic_numbers = np.asarray(xtal.get_atomic_numbers(), dtype=np.float32) + positions = xtal.get_positions()[:, :2] + mask = ( + (positions[:, 0] >= x_min) + & (positions[:, 0] < x_max) + & (positions[:, 1] >= y_min) + & (positions[:, 1] < y_max) + ) + positions = positions[mask] + atomic_numbers = atomic_numbers[mask] + + half = atom_frame // 2 + for pos, atomic_number in zip(positions, atomic_numbers): + x_rel = (pos[0] - x_min) / pixel_size + y_rel = (pos[1] - y_min) / pixel_size + x_round = int(np.round(x_rel)) + y_round = int(np.round(y_rel)) + dx = x_rel - x_round + dy = y_rel - y_round + + atom_patch = self._sub_pix_gaussian(size=atom_frame, sigma=sigma, dx=dx, dy=dy) * atomic_number + x0 = x_round + padding - half + y0 = y_round + padding - half + x1 = x0 + atom_frame + y1 = y0 + atom_frame + if x0 < 0 or y0 < 0 or x1 > padded.shape[0] or y1 > padded.shape[1]: + continue + padded[x0:x1, y0:y1] += atom_patch + + potential = padded[padding:-padding, padding:-padding] + max_val = float(np.max(potential)) if potential.size else 0.0 + if max_val > 0: + potential = potential / max_val + return potential.astype(np.float32) + + @staticmethod + def _poisson_noise(image: np.ndarray, counts: float, rng: np.random.Generator) -> np.ndarray: + """Apply normalized Poisson noise based on simulated electron counts.""" + image = image - image.min() + total = float(image.sum()) + if total <= 0 or counts <= 0: + return np.zeros_like(image, dtype=np.float32) + image = image / total + noisy = rng.poisson(image * counts).astype(np.float32) + noisy -= noisy.min() + m = float(noisy.max()) + if m > 0: + noisy /= m + return noisy + + @staticmethod + def _lowfreq_noise( + image: np.ndarray, + noise_level: float, + freq_scale: float, + rng: np.random.Generator, + ) -> np.ndarray: + """Generate low-frequency spatial noise to simulate background drift or detector artifacts.""" + size_x, size_y = image.shape + noise = rng.normal(0, noise_level, (size_x, size_y)) + noise_fft = np.fft.fft2(noise) + x_freqs = np.fft.fftfreq(size_x) + y_freqs = np.fft.fftfreq(size_y) + freq_filter = np.outer( + np.exp(-np.square(x_freqs) / (2 * freq_scale**2)), + np.exp(-np.square(y_freqs) / (2 * freq_scale**2)), + ) + filtered_noise = np.fft.ifft2(noise_fft * freq_filter).real + filtered_noise -= filtered_noise.min() + m = float(filtered_noise.max()) + if m > 0: + filtered_noise /= m + return filtered_noise.astype(np.float32) + + def _generate_sample(self, seed: int) -> None: + """Procedurally generate the underlying base sample composed of bulk nanoparticles.""" + rng = np.random.default_rng(int(seed)) + sample_xy = float(self.sample_size_xy) * 1e10 + sample_z = float(self.sample_size_z) * 1e10 + if sample_xy <= 0.0 or sample_z <= 0.0: raise ValueError( - f"beam_pos values must be in [0.0, 1.0], got x={x}, y={y}" + "sample_size_xy and sample_size_z must both be > 0." ) - self._beam_pos_x = x - self._beam_pos_y = y - + particle_radius = 16.0 + radius_std = 2.0 + aspect_ratio = 0.4 + min_separation = 3.0 + n_particles = max(1, int(self.sample_particle_count)) + max_attempts = 500 - # ------------------------------------------------------------------ - # Internal acquisition helpers - # ------------------------------------------------------------------ - - def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: list) -> np.ndarray: - """ - Simulate a stem image of nanopartcles - For now, these params are hard-coded here. - Eventually, we will have a sample module (for metadata, but most useful for DigitalTwins) - """ - size = imsize - self._imsize = imsize - fov = self._fov * 1e10 # angstroms - edge_crop = 20 - beam_current = 1000 # pA? unsure - blur_noise_level = float(0.1) - pixel_size = fov / size - - # ── Nanoparticle parameters - particle_radius = 16.0 # Angstroms, mean radius - radius_std = 2.0 # randomize size a bit - aspect_ratio = 0.4 # z_radius = aspect_ratio * xy_radius (flat pancake) - min_separation = 3.0 # minimum gap between particle surfaces (Angstroms) - n_particles = 40 # how many particles to try to place - max_attempts = 500 # attempts to place each particle without overlap bulk_types = { - 'Au': bulk('Au', 'fcc', a=4.08), - 'Pt': bulk('Pt', 'fcc', a=3.92), - 'Fe': bulk('Fe', 'bcc', a=2.87), + "Au": bulk("Au", "fcc", a=4.08), + "Pt": bulk("Pt", "fcc", a=3.92), + "Fe": bulk("Fe", "bcc", a=2.87), } bulk_names = list(bulk_types.keys()) desired_angles = [(0, 0, 0), (60, 0, 0), (45, 45, 45)] - # get probe - ab = pt.get_target_aberrations("Spectra300", 200000) - ab['acceleration_voltage'] = 200e3 # eV - ab['FOV'] = fov /10 # nm - ab['convergence_angle'] = 30 # mrad - ab['wavelength'] = it.get_wavelength(ab['acceleration_voltage']) - - def sub_pix_gaussian(size=10, sigma=0.2, dx=0.0, dy=0.0): - # returns sub-pix shifted gaussian - coords = np.arange(size) - (size - 1) / 2.0 - x, y = np.meshgrid(coords, coords) - g = np.exp(-(((x + dx) ** 2 + (y + dy) ** 2) / (2 * sigma**2))) - g /= g.max() - return g - - def create_pseudo_potential(xtal, pixel_size, sigma, bounds, atom_frame=11): - # Create empty image - x_min, x_max = bounds[0], bounds[1] - y_min, y_max = bounds[2], bounds[3] - pixels_x = int((x_max - x_min) / pixel_size) - pixels_y = int((y_max - y_min) / pixel_size) - potential_map = np.zeros((pixels_x, pixels_y)) - padding = atom_frame # to avoid edge effects - potential_map = np.pad(potential_map, padding, mode='constant', constant_values=0.0) - - # Map of atomic numbers - i.e. scattering intensity - atomic_numbers = xtal.get_atomic_numbers() - positions = xtal.get_positions()[:, :2] - - mask = ((positions[:, 0] >= x_min) & (positions[:, 0] < x_max) & (positions[:, 1] >= y_min) & (positions[:, 1] < y_max)) - positions = positions[mask] - atomic_numbers = atomic_numbers[mask] - - for pos, atomic_number in zip(positions, atomic_numbers): - x,y = np.round(pos/pixel_size) - dx,dy = pos - np.round(pos) - - single_atom = sub_pix_gaussian(size=atom_frame, sigma=sigma, dx=dx, dy=dy) * atomic_number - potential_map[int(x+padding+dx-padding//2-1):int(x+padding+dx+padding//2),int(y+padding+dy-padding//2-1):int(y+padding+dy+padding//2)] += single_atom - potential_map = potential_map[padding:-padding, padding:-padding] - normalized_map = potential_map / np.max(potential_map) - - return normalized_map - - def poisson_noise(image, counts = 1e9): - # Normalize the image - image = image - image.min() - image = image / image.sum() - noisy_image = np.random.poisson(image * counts) - noisy_image = noisy_image - noisy_image.min() - noisy_image = noisy_image / noisy_image.max() - - return noisy_image - - def lowfreq_noise(image, noise_level=0.1, freq_scale=0.1): - size_x, size_y = image.shape - - noise = np.random.normal(0, noise_level, (size_x, size_y)) - noise_fft = np.fft.fft2(noise) - - # Create a frequency filter that emphasizes low frequencies - x_freqs = np.fft.fftfreq(size_x) - y_freqs = np.fft.fftfreq(size_y) - freq_filter = np.outer(np.exp(-np.square(x_freqs) / (2 * freq_scale**2)), - np.exp(-np.square(y_freqs) / (2 * freq_scale**2))) - - # Apply the frequency filter to the noise in the frequency domain - filtered_noise_fft = noise_fft * freq_filter - noisy_image = np.fft.ifft2(filtered_noise_fft).real - noisy_image = noisy_image - noisy_image.min() - noisy_image = noisy_image / noisy_image.max() - return noisy_image - - def rotation_matrix(alpha, beta, gamma): - a, b, g = np.radians([alpha, beta, gamma]) - Rz = np.array([[np.cos(a), -np.sin(a), 0], [np.sin(a), np.cos(a), 0], [0, 0, 1]]) - Ry = np.array([[np.cos(b), 0, np.sin(b)], [0, 1, 0], [-np.sin(b), 0, np.cos(b)]]) - Rx = np.array([[1, 0, 0], [0, np.cos(g), -np.sin(g)], [0, np.sin(g), np.cos(g)]]) - return Rz @ Ry @ Rx - - # ── Place particle centers with exclusion zone ────────────────────────────── - placed_centers = [] - placed_particles = [] - particle_records = [] # <-- NEW: one entry per placed particle + placed_centers: list[tuple[float, float, float]] = [] + placed_particles: list[tuple[str, np.ndarray, float, tuple[float, float, float]]] = [] + particle_records: list[dict] = [] + x_min, x_max = -sample_xy * 0.5, sample_xy * 0.5 + y_min, y_max = -sample_xy * 0.5, sample_xy * 0.5 + z_mid = 0.0 for _ in range(max_attempts * n_particles): if len(placed_particles) >= n_particles: break - - radius = np.random.normal(particle_radius, radius_std) - radius = np.clip(radius, 3.0, None) - + radius = float(np.clip(rng.normal(particle_radius, radius_std), 3.0, None)) margin = radius + 2.0 - - sample_fov = (fov*1.5, fov*1.5, fov*0.5) # angstroms - cx = np.random.uniform(margin, sample_fov[0] - margin) - cy = np.random.uniform(margin, sample_fov[1] - margin) - cz = sample_fov[2] * 0.5 + cx = float(rng.uniform(x_min + margin, x_max - margin)) + cy = float(rng.uniform(y_min + margin, y_max - margin)) + cz = z_mid too_close = False - for (px, py, pr) in placed_centers: - dist = np.sqrt((cx - px)**2 + (cy - py)**2) - if dist < (radius + pr + min_separation): + for px, py, pr in placed_centers: + if np.hypot(cx - px, cy - py) < (radius + pr + min_separation): too_close = True break if too_close: continue placed_centers.append((cx, cy, radius)) - btype = np.random.choice(bulk_names) - i = len(placed_particles) - angles = desired_angles[i] if i < len(desired_angles) else tuple(np.random.rand(3) * 360) - placed_particles.append((btype, np.array([cx, cy, cz]), radius, angles)) - - # ── record composition from the bulk unit cell ────────────────────── - symbols_in_bulk = bulk_types[btype].get_chemical_symbols() - counts_dict = {} - for s in symbols_in_bulk: - counts_dict[s] = counts_dict.get(s, 0) + 1 + btype = str(rng.choice(bulk_names)) + idx = len(placed_particles) + angles = desired_angles[idx] if idx < len(desired_angles) else tuple(rng.uniform(0, 360, size=3)) + center = np.array([cx, cy, cz], dtype=np.float64) + placed_particles.append((btype, center, radius, angles)) + + counts_dict: dict[str, int] = {} + for symbol in bulk_types[btype].get_chemical_symbols(): + counts_dict[symbol] = counts_dict.get(symbol, 0) + 1 total = sum(counts_dict.values()) - composition = {s: c / total for s, c in counts_dict.items()} # fractions - - particle_records.append({ - 'center': np.array([cx / pixel_size - edge_crop, cy / pixel_size - edge_crop]), # pixels, image coords - 'radius': radius / pixel_size, # pixels - 'btype': btype, - 'composition': composition, - }) - - # add particle records to self for later retrieval in spectrum acquisition - self._particle_records = particle_records - print(f"Placed {len(placed_particles)} particles") - - - - # ── Carve each nanoparticle from its bulk ─────────────────────────────────── - all_positions = [] - all_symbols = [] + composition = {symbol: count / total for symbol, count in counts_dict.items()} + particle_records.append( + { + "center": center, + "radius": radius, + "btype": btype, + "composition": composition, + } + ) - for (btype, center, radius, angles) in placed_particles: + all_positions: list[np.ndarray] = [] + all_symbols: list[str] = [] + for btype, center, radius, angles in placed_particles: this_bulk = bulk_types[btype] - a_lat = this_bulk.cell.lengths()[0] - z_radius = radius * aspect_ratio + a_lat = this_bulk.cell.lengths()[0] + z_radius = radius * aspect_ratio - # supercell just big enough to carve from - rep = int(radius * 2 / a_lat) + 3 + rep = int(radius * 2 / a_lat) + 3 supercell = this_bulk.repeat((rep, rep, rep)) - - R = rotation_matrix(*angles) - positions = supercell.get_positions().copy() - positions -= positions.mean(axis=0) # center at origin before rotation - positions = positions @ R.T - - # ellipsoidal mask (flat in z) + + # Center the supercell at origin + supercell.center(about=(0.0, 0.0, 0.0)) + + # Apply rotations (Z, Y, X by corresponding euler angles) + if angles[2] != 0.0: + supercell.rotate(angles[2], 'x', center=(0, 0, 0)) + if angles[1] != 0.0: + supercell.rotate(angles[1], 'y', center=(0, 0, 0)) + if angles[0] != 0.0: + supercell.rotate(angles[0], 'z', center=(0, 0, 0)) + + positions = supercell.get_positions() + r_scaled = np.sqrt( - (positions[:, 0] / radius)**2 + - (positions[:, 1] / radius)**2 + - (positions[:, 2] / z_radius)**2 + (positions[:, 0] / radius) ** 2 + + (positions[:, 1] / radius) ** 2 + + (positions[:, 2] / z_radius) ** 2 ) mask = r_scaled <= 1.0 + selected_positions = positions[mask] + center + selected_symbols = [s for s, m in zip(supercell.get_chemical_symbols(), mask) if m] + if len(selected_positions) == 0: + continue + all_positions.append(selected_positions) + all_symbols.extend(selected_symbols) + + if all_positions: + stacked_positions = np.vstack(all_positions) + self._sample_atoms_base = Atoms(symbols=all_symbols, positions=stacked_positions) + else: + self._sample_atoms_base = Atoms() + + self._particle_records_base = particle_records + self._all_sample_elements = sorted({el for rec in particle_records for el in rec["composition"]}) + self._world_bounds_ang = { + "x_min": x_min, + "x_max": x_max, + "y_min": y_min, + "y_max": y_max, + "z_min": -sample_z * 0.5, + "z_max": sample_z * 0.5, + } + self._cached_pose_key = None + self._update_view_cache(force=True) - positions = positions[mask] + center - symbols = [s for s, m in zip(supercell.get_chemical_symbols(), mask) if m] + def read_manufacturer(self) -> str: + """Read method for the manufacturer attribute.""" + return self._manufacturer - all_positions.append(positions) - all_symbols.extend(symbols) + def read_beam_pos(self): + """Read method for the beam position attribute.""" + return [self._beam_pos_x, self._beam_pos_y] - all_positions = np.vstack(all_positions) - xtal = Atoms(symbols=all_symbols, positions=all_positions) + def write_beam_pos(self, value): + """Write method for the beam position attribute.""" + x, y = value[0], value[1] + if not (0.0 <= x <= 1.0 and 0.0 <= y <= 1.0): + raise ValueError(f"beam_pos values must be in [0.0, 1.0], got x={x}, y={y}") + self._beam_pos_x = float(x) + self._beam_pos_y = float(y) - # ── Rest is unchanged ──────────────────────────────────────────────────────── - edge = 2 * edge_crop * pixel_size - frame = (0, fov+edge, 0, fov+edge) - potential = create_pseudo_potential(xtal, pixel_size, sigma=1, bounds=frame, atom_frame=11) - probe, A_k, chi = pt.get_probe(ab, size+2*edge_crop, size+2*edge_crop, verbose=True) + def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: list) -> np.ndarray: + """Simulate STEM image acquisition using convolutions of the pseudo-potential and electron probe.""" + self._sync_stage_from_proxy() + self._imsize = imsize + self._update_view_cache(force=False) + + size = imsize + fov_ang = self._fov * 1e10 + edge_crop = 20 + beam_current = 1000.0 # pA + blur_noise_level = 0.1 + pixel_size = fov_ang / size + + frame_half = fov_ang * 0.5 + edge_ang = edge_crop * pixel_size + frame = (-frame_half - edge_ang, frame_half + edge_ang, -frame_half - edge_ang, frame_half + edge_ang) + potential = self._create_pseudo_potential( + self._sample_atoms_view, + pixel_size=pixel_size, + sigma=1.0, + bounds=frame, + atom_frame=11, + ) + + ab = pt.get_target_aberrations("Spectra300", 200000) + ab["acceleration_voltage"] = 200e3 + ab["FOV"] = fov_ang / 10.0 # nm + ab["convergence_angle"] = 30 + ab["wavelength"] = it.get_wavelength(ab["acceleration_voltage"]) + probe, _a_k, _chi = pt.get_probe(ab, size + 2 * edge_crop, size + 2 * edge_crop, verbose=False) psf_shifted = np.fft.ifftshift(probe) image = np.fft.ifft2(np.fft.fft2(potential) * np.fft.fft2(psf_shifted)) - image = np.absolute(image) + image = np.abs(image) image = image[edge_crop:-edge_crop, edge_crop:-edge_crop] - scan_time = dwell_time * size * size - counts = scan_time * (beam_current * 1e-12) / (1.602e-19) - noisy_image = poisson_noise(image, counts=counts) - blur_noise = lowfreq_noise(noisy_image, noise_level=0.1, freq_scale=.1) - noisy_image += blur_noise * blur_noise_level - sim_im = np.array(noisy_image, dtype=np.float32) - - return sim_im - + scan_time = dwell_time * size * size + counts = scan_time * (beam_current * 1e-12) / (1.602e-19) + pose_seed = int(abs(hash((tuple(np.round(self._stage_position, 10)), round(self._fov, 14), size))) % (2**32)) + rng = np.random.default_rng(pose_seed + int(self.sample_seed)) + noisy_image = self._poisson_noise(image, counts=counts, rng=rng) + noisy_image += self._lowfreq_noise(noisy_image, noise_level=0.1, freq_scale=0.1, rng=rng) * blur_noise_level + return np.clip(noisy_image, 0.0, 1.0).astype(np.float32) + + def _acquire_stem_image_advanced( + self, + detector_names: list[str], + base_resolution: int, + scan_region, + dwell_time: float, + auto_beam_blank: bool, + ) -> list[np.ndarray]: + """Perform advanced STEM acquisition returning multiple channels mapping to requested detectors.""" + im = self._acquire_stem_image(int(base_resolution), float(dwell_time), detector_names) + return [im.copy() for _ in detector_names] def _acquire_spectrum(self, detector_name: str, exposure_time: float): - px, py = self.read_beam_pos() # fractional [0, 1] - px_pix = px * self._imsize - py_pix = py * self._imsize - - for rec in self._particle_records: - cx, cy = rec['center'] # pixels - r = rec['radius'] # pixels - if (px_pix - cx)**2 + (py_pix - cy)**2 <= r**2: - raw = {el: frac for el, frac in rec['composition'].items()} - total = sum(raw.values()) - return {el: v / total + np.random.normal(0.01, 0.1) for el, v in raw.items()} - - all_elements = {el for rec in self._particle_records for el in rec['composition']} - return {el: np.abs(np.random.normal(0, 0.05)) for el in all_elements} - + """Simulate EDS spectrum acquisition at the current beam position weighted by surrounding particles.""" + self._sync_stage_from_proxy() + self._update_view_cache(force=False) + + px, py = self.read_beam_pos() + fov_ang = self._fov * 1e10 + beam_x = (px - 0.5) * fov_ang + beam_y = (py - 0.5) * fov_ang + + weighted: dict[str, float] = {} + weight_sum = 0.0 + for rec in self._particle_records_view: + cx, cy = rec["center"][:2] + radius = rec["radius"] + dist = float(np.hypot(beam_x - cx, beam_y - cy)) + if dist <= radius: + w = max(1e-6, 1.0 - (dist / radius)) + weight_sum += w + for element, frac in rec["composition"].items(): + weighted[element] = weighted.get(element, 0.0) + w * frac + + spectrum_seed = int( + abs( + hash( + ( + tuple(np.round(self._stage_position, 10)), + round(px, 6), + round(py, 6), + round(exposure_time, 6), + int(self.sample_seed), + ) + ) + ) + % (2**32) + ) + rng = np.random.default_rng(spectrum_seed) + + if weight_sum <= 0.0: + return { + element: float(np.abs(rng.normal(0.0, 0.02))) + for element in (self._all_sample_elements or ["Au", "Pt", "Fe"]) + } + + normalized = {el: val / weight_sum for el, val in weighted.items()} + noisy = {} + for element, value in normalized.items(): + noisy[element] = float(max(0.0, value + rng.normal(0.0, 0.01))) + total = sum(noisy.values()) + if total <= 0.0: + return noisy + return {el: val / total for el, val in noisy.items()} def _place_beam(self, position) -> None: - """ - sets resting beam position, [0:1] - """ + """Place the electron beam at the specified [x, y] coordinates.""" x, y = position self.write_beam_pos([x, y]) - def _set_fov(self, fov) -> None: - """set field of view in meters""" - # For the digital twin, we can just store this as a property and use it in acquisition simulations. - self._fov = fov - + """Set the field of view in meters.""" + self._fov = float(fov) def _get_stage(self): - """Return current stage position as (x, y, z, a, b) in meters.""" + """Return the current 5-axis stage position.""" + self._sync_stage_from_proxy() return self._stage_position - - def _move_stage(self, position): - """Move stage to specified position (x, y, z, a, b) in meters.""" - self.old_pos = self._stage_position - relative_move = np.array(position) - self._stage_position - # shift the particle records/ atoms object positions by this much, negative - - random_shift = np.random.normal(0, 5e-8, size=5) - self._stage_position = position + random_shift - -# ---------------------------------------------------------------------- -# Server entry point -# --------------------------------------------------------------------- + def _move_stage(self, position): + """Move the stage to the requested 5-axis vector with optional simulated precision noise.""" + if len(position) != 5: + raise ValueError("Stage position must have 5 values: [x, y, z, alpha, beta]") + target = np.array(position, dtype=np.float64) + + std = float(self.stage_move_noise_std) + if std > 0: + noise = np.zeros(5, dtype=np.float64) + noise[:3] = np.random.normal(0.0, std, size=3) + target = target + noise + + stage = self._detector_proxies.get("stage") + if stage is not None: + stage.x = float(target[0]) + stage.y = float(target[1]) + stage.z = float(target[2]) + stage.alpha = float(target[3]) + stage.beta = float(target[4]) + + self._stage_position = target + self._update_view_cache(force=False) + + def get_viewport_metadata(self) -> str: + """Return JSON-formatted metadata regarding the current simulation viewport and environment state.""" + self._sync_stage_from_proxy() + fov_ang = self._fov * 1e10 + stage_xyz_ang = self._stage_position[:3] * 1e10 + viewport = { + "x_min": float(stage_xyz_ang[0] - fov_ang * 0.5), + "x_max": float(stage_xyz_ang[0] + fov_ang * 0.5), + "y_min": float(stage_xyz_ang[1] - fov_ang * 0.5), + "y_max": float(stage_xyz_ang[1] + fov_ang * 0.5), + "z_center": float(stage_xyz_ang[2]), + } + metadata = { + "stage_position": [float(v) for v in self._stage_position], + "fov_m": float(self._fov), + "fov_angstrom": float(fov_ang), + "imsize": int(self._imsize), + "sample_seed": int(self.sample_seed), + "sample_size_xy": float(self.sample_size_xy), + "sample_size_z": float(self.sample_size_z), + "sample_size_xy_generated_angstrom": float(self._world_bounds_ang["x_max"] - self._world_bounds_ang["x_min"]), + "sample_size_z_generated_angstrom": float(self._world_bounds_ang["z_max"] - self._world_bounds_ang["z_min"]), + "viewport_world_angstrom": viewport, + "world_bounds_angstrom": self._world_bounds_ang, + "particle_count": len(self._particle_records_base), + } + return json.dumps(metadata) if __name__ == "__main__": ThermoDigitalTwin.run_server() diff --git a/asyncroscopy/mcp/mcp_server.py b/asyncroscopy/mcp/mcp_server.py index 5e4e54f..40c0735 100644 --- a/asyncroscopy/mcp/mcp_server.py +++ b/asyncroscopy/mcp/mcp_server.py @@ -14,9 +14,11 @@ import importlib import pkgutil import base64 +import json from inspect import signature, getdoc from typing import Any, Dict, Callable, Annotated from pydantic import Field +import traceback from tango import Database, DeviceProxy, CommandInfo, CmdArgType from tango.utils import ( @@ -271,9 +273,14 @@ def _normalize_command_result(out_type: CmdArgType, result: Any) -> Any: metadata_raw, payload_raw = result if isinstance(metadata_raw, bytes): - metadata = metadata_raw.decode("utf-8", errors="replace") + metadata_str = metadata_raw.decode("utf-8", errors="replace") else: - metadata = str(metadata_raw) + metadata_str = str(metadata_raw) + + try: + metadata = json.loads(metadata_str) + except (json.JSONDecodeError, TypeError): + metadata = metadata_str if isinstance(payload_raw, memoryview): payload_bytes = payload_raw.tobytes() @@ -359,11 +366,11 @@ def _build_command_docstring( in_desc = cmd_info.in_type_desc out_desc = cmd_info.out_type_desc - lines.append(f"Input Type: {self.in_type.name}") + lines.append(f"Input Type: {in_type.name}") if in_desc: lines.append(f"Input Description: {in_desc}") - lines.append(f"Output Type: {self.out_type.name}") + lines.append(f"Output Type: {out_type.name}") if out_desc: lines.append(f"Output Description: {out_desc}") @@ -423,7 +430,9 @@ def _create_wrapper( py_return_type = self._tango_type_to_python(out_type) if in_desc and in_desc.lower() not in ("uninitialised", "none", "", "uninitialized"): - arg_type = Annotated[py_type, Field(description=in_desc)] + # Sanitize description - remove newlines to prevent JSON schema breakage + clean_desc = in_desc.replace("\n", " ").strip() + arg_type = Annotated[py_type, Field(description=clean_desc)] else: arg_type = py_type @@ -431,32 +440,46 @@ def _create_wrapper( def wrapper(): result = func() return self._normalize_command_result(out_type, result) + + params = [] wrapper.__annotations__ = {"return": py_return_type} else: param_name = self._get_param_name(dev_class, command_name) - # Build the wrapper with the real param name so pydantic/FastMCP - # advertises the correct keyword in the tool schema. - ns: dict = { + ns = { "func": func, + "self": self, "arg_type": arg_type, "py_return_type": py_return_type, - "self": self, "out_type": out_type, } - # FastMCP inspects the actual parameter *name* in the function signature - # to build its JSON schema (e.g. "exposure_time" not generic "arg"). - # Python's exec() is the only way to set a runtime-determined param name - # on a function — functools.wraps and __wrapped__ don't affect introspection. - exec( + + # Use exactly the named parameter. Python naturally supports both + # wrapper(val) and wrapper(param=val) for standard named parameters. + # This satisfies strict introspection in frameworks like smolagents. + exec_str = ( f"def wrapper({param_name}: arg_type) -> py_return_type:\n" - f" result = func({param_name})\n" - f" return self._normalize_command_result(out_type, result)\n", - ns, + f" return self._normalize_command_result(out_type, func({param_name}))" ) + exec(exec_str, ns) wrapper = ns["wrapper"] - wrapper.__annotations__ = {param_name: arg_type, "return": py_return_type} + params = [ + inspect.Parameter( + param_name, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + annotation=arg_type, + ) + ] + + wrapper.__annotations__ = { + p.name: p.annotation for p in params + } + wrapper.__annotations__["return"] = py_return_type + + wrapper.__signature__ = inspect.Signature( + parameters=params, return_annotation=py_return_type + ) wrapper.__doc__ = doc # Set unique function name for FastMCP tool registration @@ -562,6 +585,7 @@ def setup(self, print_summary: bool = True): except Exception as e: if self.verbose: print(f"Failed to wrap {dev_class}.{command_name}: {e}") + traceback.print_exc() # Print all registered MCP tools if print_summary: diff --git a/docs/404.md b/docs/404.md new file mode 100644 index 0000000..50adb84 --- /dev/null +++ b/docs/404.md @@ -0,0 +1,14 @@ +# Page Not Found + +The page you're looking for doesn't exist. + +Please return to the [home page](/) or navigate using the menu on the left. + +## Common Pages + +- [Contributing Guide](./dev_guide.md) +- [Base Microscope Extension Notes](./Microscopy/modify_base_microscope.md) +- [Thermo Microscope Extension Notes](./Microscopy/modify_thermo_microscope.md) +- [Adding a Detector](./Adding_New_Hardware/add_detector.md) +- [MCP Server Documentation](./mcp_server.md) +- [Upcoming Changes](./upcoming_changes.md) diff --git a/docs/add_detector.md b/docs/Adding_New_Hardware/add_detector.md similarity index 72% rename from docs/add_detector.md rename to docs/Adding_New_Hardware/add_detector.md index 1daf6a3..4f13c54 100644 --- a/docs/add_detector.md +++ b/docs/Adding_New_Hardware/add_detector.md @@ -6,14 +6,14 @@ ```python newdet_device_address = device_property(dtype=str, default_value="test/detector/newdet") ``` -3. Register it in `_connect_detector_proxies()` - see step 4 in [modify_thermo_microscope.md](modify_thermo_microscope.md) +3. Register it in `_connect_detector_proxies()` - see step 4 in [modify_thermo_microscope.md](../Microscopy/modify_thermo_microscope.md) ```python "newdet": self.newdet_device_address, ``` - note : base class `Microscope` at asyncroscopy/Microscope.py is not the right place for this: 4. Add acquisition logic: -- see step 3 in [modify_base_microscope](modify_base_microscope.md) -- see step 5 in [modify_thermo_microscope](modify_thermo_microscope.md) +- see step 3 in [modify_base_microscope](../Microscopy/modify_base_microscope.md) +- see step 5 in [modify_thermo_microscope](../Microscopy/modify_thermo_microscope.md) 5. Add `tests/detectors/test_NEWDET.py` following `test_HAADF.py` as a template. diff --git a/docs/MCP/asyncroscopy_mcp.md b/docs/MCP/asyncroscopy_mcp.md new file mode 100644 index 0000000..620d225 --- /dev/null +++ b/docs/MCP/asyncroscopy_mcp.md @@ -0,0 +1,458 @@ +# Asyncroscopy MCP Server Implementation + +How the Asyncroscopy system bridges pyTango device control and LLM agents. + +## Overview + +The Asyncroscopy MCP server exposes microscopy hardware (via pyTango) to language models. This enables LLM-driven microscopy workflows without direct hardware knowledge. + +## Tango Architecture + +Asyncroscopy uses pyTango as the hardware abstraction layer: + +``` +LLM Agent + ↓ +MCP Client + ↓ +MCPServer (Asyncroscopy) + ↓ +Tango DeviceProxy + ↓ +Hardware (Microscope, Detectors, Stage, etc.) +``` + +PyTango decouples software from hardware through networked device objects. Each device exports commands and attributes. + +## Device Classes + +Asyncroscopy defines Tango Device subclasses for microscopy hardware: + +### Base: `Microscope` (asyncroscopy/Microscope.py) + +Core microscope control: +- `get_image()` - Acquire STEM image +- `get_spectrum()` - Acquire spectrum +- Attributes: voltage, magnification, probe_current + +### Thermo Fisher Microscope: `ThermoMicroscope` (asyncroscopy/ThermoMicroscope.py) + +Extends Microscope with multi-detector orchestration: +- Connects detector proxies (HAADF, EELS, EDS) +- Coordinates acquisition across detectors +- Manages state synchronization + +### Detectors (asyncroscopy/detectors/) + +Individual detector devices: +- `CAMERA.py` - Generic camera control +- `EDS.py` - Energy dispersive X-ray spectroscopy +- `EELS.py` - Electron energy loss spectroscopy + +### Hardware (asyncroscopy/hardware/) + +Low-level hardware control: +- `STAGE.py` - Specimen stage movement +- `CORRECTOR.py` - Aberration corrector +- `SCAN.py` - Beam scanning control + +## MCP Server Discovery + +When MCPServer starts, it performs discovery: + +```python +server = MCPServer( + name="Asyncroscopy", + tango_host="microscope.lab", + tango_port=9094, + search_packages=["asyncroscopy"] +) +server.setup() +``` + +### Discovery Steps + +1. **Connect to Tango Database** + ```python + self.database = Database(tango_host, tango_port) + ``` + +2. **List All Exported Devices** + ```python + devices = self.database.get_device_exported("*") + # Returns: ["test/microscope/1", "test/detector/eds", ...] + ``` + +3. **Filter by Class** + ```python + # Skip infrastructure (DataBase, DServer) + # Skip blocked classes (e.g., SimulatedStage) + available = [d for d in devices if not blocked(d)] + ``` + +4. **Extract Commands per Device** + ```python + dev = DeviceProxy(device_name) + commands = dev.command_list_query() + # Returns CommandInfo for each command + ``` + +5. **Build Tool Wrappers** + ```python + for cmd in commands: + if not blocked(cmd.name): + wrapper = self._create_wrapper(cmd) + tools[dev_class][cmd.name] = wrapper + ``` + +6. **Find Source Code** + ```python + cls = self._get_tango_device_class("Microscope") + # Searches asyncroscopy package for class definition + ``` + +7. **Register with MCP** + ```python + for wrapper_func in all_wrappers: + tool = Tool.from_function(wrapper_func) + mcp.add_tool(tool) + ``` + +## Example: Image Acquisition + +How an LLM acquires a microscope image through MCP: + +### Tango Device Definition + +```python +# asyncroscopy/Microscope.py +class Microscope(Device): + @command(dtype_in=int, dtype_out=str) + def get_image(self, exposure_ms: int) -> str: + """Acquire a STEM image with specified exposure time.""" + # Interact with AutoScript microscope API + img = self._acquire_stem_image(exposure_ms) + # Return as DevEncoded (binary image + metadata) + return (json.dumps(metadata), img.tobytes()) +``` + +### MCP Tool Registration + +1. Server queries Tango: `Microscope.get_image` exists +2. Extracts parameter name from source: `exposure_ms` +3. Maps Tango type to Python: `int` → `int` +4. Builds function signature: + ```python + def Microscope_get_image(exposure_ms: int) -> dict: + """Acquire a STEM image with specified exposure time. + + Tango Device Class: Microscope + Tango Command: get_image + """ + result = dev.get_image(exposure_ms) + return { + "encoding": "base64", + "metadata": metadata, + "payload": base64.b64encode(img).decode() + } + ``` + +### LLM Usage + +``` +Agent: "Acquire an image with 5ms exposure." + +MCP Server invokes: Microscope_get_image(exposure_ms=5) + +Result: { + "encoding": "base64", + "metadata": {"shape": [1024, 1024], "dtype": "uint8"}, + "payload": "iVBORw0KGgo..." +} + +Agent: "Image acquired. Dimensions: 1024×1024." +``` + +## Multi-Device Coordination + +ThermoMicroscope orchestrates multiple detector devices: + +```python +class ThermoMicroscope(Microscope): + def __init__(self, cl, name): + super().__init__(cl, name) + # Connect to detector device proxies + self.haadf = DeviceProxy(self.haadf_device_address) + self.eels = DeviceProxy(self.eels_device_address) + self.eds = DeviceProxy(self.eds_device_address) + + @command(dtype_out=str) + def acquire_multimodal(self) -> str: + """Acquire STEM image + EELS spectrum simultaneously.""" + # Coordinate detector settings + self.haadf.sync_dwell(self.scan_dwell_time) + self.eels.sync_energy_range(self.energy_range) + + # Acquire data + stem_data = self.haadf.acquire() + eels_data = self.eels.acquire() + + # Return combined result + return (metadata, combined_data) +``` + +The MCP server automatically exposes `acquire_multimodal` as a tool. + +## Type Handling + +### Scalar Types + +```python +# Tango → MCP +DevInt32 → int +DevFloat64 → float +DevBoolean → bool +DevString → str +``` + +Tool parameter validates input type before sending to hardware. + +### Array Types + +```python +# Tango → MCP +DevVarULongArray → list[int] +DevVarFloatArray → list[float] +``` + +MCP converts JSON array to typed Python list. + +### DevEncoded (Binary Data) + +Used for images, spectra, and complex structures: + +```python +# In Tango command +result = (metadata_string, image_bytes) +return result # DevEncoded type + +# In MCP tool +normalized = self._normalize_command_result(DevEncoded, result) +# Returns: +{ + "encoding": "base64", + "metadata": metadata_string, + "payload": base64_encode(image_bytes) +} +``` + +Agent receives JSON-safe structure. To use binary data, agent decodes base64: + +```python +import base64 +payload = base64.b64decode(result["payload"]) +img_array = np.frombuffer(payload, dtype=np.uint8).reshape(...) +``` + +## Configuration for Custom Hardware + +To add your own hardware to the Asyncroscopy MCP server: + +### 1. Define a Tango Device + +```python +# mymodule/my_detector.py +from tango.server import Device, command, attribute + +class MyDetector(Device): + @attribute(dtype=float) + def signal_level(self): + return self._get_signal() + + @command(dtype_in=int, dtype_out=str) + def measure(self, duration_ms: int) -> str: + """Measure signal for specified duration.""" + data = self._measure(duration_ms) + return (json.dumps(metadata), data.tobytes()) +``` + +### 2. Create a Device Server + +```python +# mymodule/server.py +from tango.server import run + +from mymodule.my_detector import MyDetector + +if __name__ == "__main__": + run([MyDetector]) +``` + +### 3. Register Device in Tango Database + +```bash +tango_admin --add-server MyServer/myinstance MyDetector test/detector/custom +``` + +### 4. Start Device Server + +```bash +python mymodule/server.py +``` + +### 5. Create MCP Server with Custom Package + +```python +from asyncroscopy.mcp.mcp_server import MCPServer + +server = MCPServer( + name="CustomMicroscopy", + tango_host="localhost", + tango_port=9094, + search_packages=["mymodule", "asyncroscopy"] +) +server.start() +``` + +The MCP server now discovers and exposes your custom detector. + +## Adding Custom MCP Tools + +Extend MCPServer to add tools that coordinate or analyze: + +```python +class EnhancedMicroscopyServer(MCPServer): + @tool() + def focus_iteratively(self, tolerance_nm: float = 1.0) -> dict: + """Automatically focus microscope using iterative approach.""" + microscope = tango.DeviceProxy("test/microscope/1") + stage = tango.DeviceProxy("test/stage/1") + + best_focus = None + best_contrast = 0 + + # Scan focus range + for z in range(-100, 101, 10): + stage.move_z(z) + img = microscope.get_image(10) # 10ms exposure + contrast = self._calculate_contrast(img) + + if contrast > best_contrast: + best_contrast = contrast + best_focus = z + + return {"best_focus_um": best_focus / 1000.0, "contrast": best_contrast} + + @tool() + def suggest_optimal_conditions(self, material: str) -> dict: + """Suggest microscopy parameters for a material.""" + params = { + "Si": {"voltage_kv": 200, "exposure_ms": 5, "magnification": 500000}, + "Au": {"voltage_kv": 100, "exposure_ms": 10, "magnification": 1000000}, + } + return params.get(material, params["Si"]) +``` + +## Blocking Commands + +Exclude dangerous or irrelevant commands: + +```python +server = MCPServer( + name="SafeMicroscopy", + tango_host="localhost", + tango_port=9094, + blocked_functions={ + "*": ["Init", "Status"], # Skip lifecycle commands + "Microscope": ["emergency_shutdown"], # Class-specific + } +) +``` + +Commands in the block list do not appear as MCP tools. + +## Debugging + +### Enable Verbose Output + +```python +server = MCPServer( + name="Debug", + tango_host="localhost", + tango_port=9094, + verbose=True +) +server.setup() +``` + +Output shows: +- Discovered devices +- Available commands per device +- Registered tools +- Tool signatures + +### Inspect Registered Tools + +```python +# After setup() +for dev_class, commands in server.tools.items(): + print(f"{dev_class}:") + for cmd_name, func in commands.items(): + print(f" • {cmd_name}: {func.__doc__}") +``` + +### Test Tool Manually + +```python +# Call the wrapped function directly +import asyncio +result = server.tools["Microscope"]["get_image"](exposure_ms=10) +print(result) +``` + +## Performance Considerations + +### Caching Device Proxies + +DeviceProxy creation is expensive. Cache them: + +```python +class OptimizedServer(MCPServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._device_cache = {} + + def _get_device(self, name): + if name not in self._device_cache: + self._device_cache[name] = DeviceProxy(name) + return self._device_cache[name] +``` + +### Avoid Blocking Operations + +Use async patterns for long-running commands: + +```python +from fastmcp.tools import tool + +class AsyncMicroscopyServer(MCPServer): + @tool() + async def acquire_mosaic_async(self, tiles_x: int, tiles_y: int) -> dict: + """Acquire mosaic (may take minutes).""" + import asyncio + results = [] + for i in range(tiles_x): + for j in range(tiles_y): + result = await asyncio.to_thread( + self._acquire_tile, i, j + ) + results.append(result) + return {"tiles": results} +``` + +## References + +- [Tango Device Programming](http://www.tango-controls.org/developers/python-api/) +- [FastMCP Tools Documentation](https://github.com/modelcontextprotocol/python-sdk) +- [MCP Protocol Specification](https://modelcontextprotocol.io/specification) +- Asyncroscopy Source: `asyncroscopy/` diff --git a/docs/MCP/building_an_mcp.md b/docs/MCP/building_an_mcp.md new file mode 100644 index 0000000..4d0b14e --- /dev/null +++ b/docs/MCP/building_an_mcp.md @@ -0,0 +1,382 @@ +# Building Custom MCP Servers + +Build Model Context Protocol servers to expose hardware or services to LLM agents. + +## Quick Start + +Create an MCP server that discovers and wraps Tango device commands: + +```python +from asyncroscopy.mcp.mcp_server import MCPServer + +server = MCPServer( + name="MyServer", + tango_host="localhost", + tango_port=9094 +) +server.start() +``` + +The server automatically: +- Connects to a Tango database +- Discovers all exported devices +- Extracts command signatures and types +- Generates MCP tools from Tango commands +- Starts an MCP server for LLM agents + +## Architecture + +### Discovery Pipeline + +``` +Tango Database + ↓ +MCPServer.__init__() → Connect to DB + ↓ +MCPServer.setup() → Query devices and commands + ↓ +_find_tools() → Extract device classes and command info + ↓ +_create_wrapper() → Convert Tango types to Python types + ↓ +MCP tool registration → Expose to LLM agents +``` + +### Type Mapping + +Tango command types are automatically mapped to Python types for MCP: + +| Tango Type | Python Type | +|-----------|------------| +| `DevVoid` | `None` | +| `DevBoolean` | `bool` | +| `DevFloat64` | `float` | +| `DevInt32` | `int` | +| `DevString` | `str` | +| `DevEncoded` | `dict` (base64) | +| Arrays | `list[type]` | + +DevEncoded binary data is base64-encoded: + +```json +{ + "encoding": "base64", + "metadata": "header_string", + "payload": "base64_encoded_data" +} +``` + +## Configuration + +### Block Lists + +Exclude specific commands or device classes from MCP exposure: + +```python +server = MCPServer( + name="MyServer", + tango_host="localhost", + tango_port=9094, + blocked_classes=["DataBase", "DServer", "MyUnwantedClass"], + blocked_functions={ + "*": ["Init", "Status"], # Global blocks + "Microscope": ["Connect", "Disconnect"], # Per-class blocks + }, + search_packages=["mymodule", "asyncroscopy"] +) +``` + +### Parameters + +- **`name`** (str): Display name for the server +- **`tango_host`** (str): Tango database hostname +- **`tango_port`** (int): Tango database port +- **`blocked_classes`** (list[str]): Tango classes to skip (default: `["DataBase", "DServer"]`) +- **`blocked_functions`** (dict | list): Commands to exclude + - List: Applied globally to all classes + - Dict: Map class names to command lists; `"*"` for global blocks +- **`search_packages`** (list[str]): Python packages to search for Tango Device source code (default: `["asyncroscopy"]`) +- **`verbose`** (bool): Print discovery and registration progress (default: `True`) + +## Adding Custom Tools + +Extend the MCPServer class to add custom tools, resources, and prompts: + +### Custom Tool + +```python +from fastmcp.tools import tool + +class MyMCPServer(MCPServer): + @tool() + def calculate_exposure(self, gain: int) -> float: + """Calculate optimal exposure based on gain.""" + return gain * 2.5 +``` + +### Custom Resource + +```python +from fastmcp.resources import resource + +class MyMCPServer(MCPServer): + @resource("config://system") + def get_system_config(self) -> str: + """Return system configuration.""" + return "TIMEOUT=30\nRETRIES=3" +``` + +### Custom Prompt + +```python +from fastmcp.prompts import prompt + +class MyMCPServer(MCPServer): + @prompt() + def focus_procedure(self, voltage: float) -> str: + """Prompt template for focusing procedure.""" + return f"Please focus the beam at {voltage}kV and report any drift." +``` + +Custom tools, resources, and prompts are automatically registered during `setup()`. + +## Implementation Details + +### Source-Level Introspection + +The server introspects Tango Device source code to improve tool descriptions: + +1. Search for the Device subclass in `search_packages` +2. Extract the actual parameter names (not generic `arg`) +3. Pull docstrings from the command method +4. Build rich descriptions for LLM agents + +```python +class Microscope(Device): + @command(dtype_in=int, dtype_out=float) + def acquire_image(self, exposure_ms: int) -> float: + """Acquire a STEM image with specified exposure.""" + # implementation +``` + +The MCP tool parameter is named `exposure_ms` (from source), not `arg`. + +### Wrapper Generation + +Commands are wrapped with proper Python signatures using `exec()`: + +```python +def _create_wrapper(self, func, cmd_info, command_name, dev_class): + # Resolve parameter name from source + param_name = self._get_param_name(dev_class, command_name) + + # Map Tango type to Python type + py_type = self._tango_type_to_python(cmd_info.in_type) + + # Generate function with proper signature + exec(f"def wrapper({param_name}: py_type): ...") + + # Normalize DevEncoded output to JSON + return self._normalize_command_result(...) +``` + +### Tool Registration + +Tools are registered via FastMCP: + +```python +tool_obj = Tool.from_function(wrapped_func) +self.mcp.add_tool(tool_obj) +``` + +Each tool has: +- Parameter names from source code +- Type hints for validation +- Full docstrings with Tango metadata +- Proper return type annotations + +## Transport Options + +### Stdio (Default) + +For local connections to agents: + +```python +server.start() +``` + +Uses JSON-RPC over stdin/stdout. Connect agents directly to the process. + +### HTTP + +For remote access: + +```python +server.start_http(host="0.0.0.0", port=8000) +``` + +Exposes MCP tools via HTTP. Agents connect via HTTP client. + +## Usage Example + +### Standalone Server + +```python +from asyncroscopy.mcp.mcp_server import MCPServer + +# Create server +server = MCPServer( + name="Microscope", + tango_host="microscope.lab.local", + tango_port=9094, + blocked_functions={"*": ["Init"]}, + verbose=True +) + +# Add custom tools +from fastmcp.tools import tool + +class CustomServer(MCPServer): + @tool() + def suggest_parameters(self, voltage: int) -> str: + """Suggest imaging parameters for given voltage.""" + return f"For {voltage}kV: gain=50, exposure=10ms" + +# Create instance and start +custom = CustomServer( + name="Microscope", + tango_host="localhost", + tango_port=9094 +) +custom.start() +``` + +### With Custom Device Classes + +```python +class MyServer(MCPServer): + @tool() + def list_available_modes(self) -> list[str]: + """List available imaging modes.""" + return ["STEM", "BF", "DF", "HAADF"] + +# Ensure your Device subclasses are importable +import mymodule # Contains MyDevice(Device) + +server = MyServer( + name="MyServer", + tango_host="localhost", + tango_port=9094, + search_packages=["mymodule"] +) +server.start() +``` + +## Testing + +### Unit Tests + +Test custom tools in isolation: + +```python +def test_custom_tool(): + server = MyServer(name="Test", tango_host="localhost", tango_port=9094) + result = server.suggest_parameters(voltage=200) + assert "gain" in result +``` + +### Integration Tests + +Test with a real Tango database: + +```python +import tango + +def test_mcp_with_tango(): + # Start Tango services (database, device server) + # Create MCPServer + server = MCPServer( + name="Test", + tango_host="localhost", + tango_port=9094 + ) + server.setup() + + # Verify tools are registered + assert len(server.tools) > 0 +``` + +See `tests/test_mcp_server.py` for full test examples. + +## Advanced Patterns + +### Conditional Tool Registration + +```python +class ConditionalServer(MCPServer): + def setup(self): + super().setup() + + # Add tools based on discovered devices + available_devices = self.list_devices() + if any("EDS" in d for d in available_devices): + self.mcp.add_tool(self.analyze_eds_spectrum) +``` + +### Dynamic Blocking + +```python +class FilterServer(MCPServer): + def _is_blocked_function(self, dev_class, command_name): + # Custom logic: block based on runtime state + if command_name.startswith("_"): + return True + return super()._is_blocked_function(dev_class, command_name) +``` + +### Multi-Device Coordination + +```python +class CoordinatedServer(MCPServer): + @tool() + def acquire_multimodal(self, exposure_ms: int) -> dict: + """Acquire STEM + EDS simultaneously.""" + stem_dev = tango.DeviceProxy("test/microscope/stem") + eds_dev = tango.DeviceProxy("test/detector/eds") + + stem_data = stem_dev.command_inout("AcquireImage", exposure_ms) + eds_data = eds_dev.command_inout("Acquire", exposure_ms) + + return {"stem": stem_data, "eds": eds_data} +``` + +## Troubleshooting + +### No Devices Discovered + +Check: +1. Tango database is running: `tango_host` and `tango_port` are correct +2. Devices are exported: `server.list_devices()` returns non-empty list +3. Devices are not blocked: Check `blocked_classes` and `blocked_functions` + +### Tools Not Appearing in Agent + +Check: +1. `setup()` is called before agent connects +2. Tool registration succeeded (check verbose output) +3. Tool wrapper function has valid signature +4. Parameter types are JSON-serializable + +### Source Introspection Not Working + +Verify: +1. Device subclass is in a module under `search_packages` +2. Module is importable: `import mymodule` works +3. Class name matches Tango class name exactly +4. Source code has proper type hints + +## References + +- [Tango Python Documentation](http://www.tango-controls.org/developers/python-api/) +- [FastMCP Documentation](https://github.com/modelcontextprotocol/python-sdk) +- [MCP Specification](https://modelcontextprotocol.io/) diff --git a/docs/mcp_server.md b/docs/MCP/mcp_server.md similarity index 80% rename from docs/mcp_server.md rename to docs/MCP/mcp_server.md index a266ca6..a960f9c 100644 --- a/docs/mcp_server.md +++ b/docs/MCP/mcp_server.md @@ -2,7 +2,27 @@ The [`MCPServer`](../asyncroscopy/mcp/mcp_server.py#L43) is a bridge between a Tango control system and the Model Context Protocol (MCP). It allows LLM agents to interact directly with hardware by exposing Tango device commands as MCP tools. + --- +## What is MCP? + +The [Model Context Protocol (MCP)](https://modelcontextprotocol.io) is an open standard +that lets AI agents connect to +external tools and data sources through a unified interface. Think of it as a standardized +API layer specifically designed for LLM interactions. + +MCP defines three core primitives that servers can expose: +- **Tools**: Executable functions the LLM can invoke (like Tango device commands) +- **Resources**: Read-only data sources (like configuration or device state) +- **Prompts**: Reusable message templates that guide LLM interactions + +## Why MCP + Asyncroscopy? + +Asyncroscopy uses PyTango to control microscope hardware. The MCPServer automatically +discovers every Tango device command in your system and exposes them as MCP tools. This +means an LLM agent can query detector settings, move the stage, acquire images, and +adjust beam parameters — all through natural language. +The Asyncroscopy MCP server exposes microscopy hardware (via pyTango) to language models. This enables LLM-driven microscopy workflows without direct hardware knowledge. ## Core Functionality @@ -16,7 +36,7 @@ On startup, the server queries the Tango Database to find all exported devices v Each discovered Tango command is wrapped into an MCP tool via [`_create_wrapper()`](../asyncroscopy/mcp/mcp_server.py#L393). The server: - Maps Tango types to Python types for parameter validation — see [`_tango_type_to_python()`](../asyncroscopy/mcp/mcp_server.py#L247). - **Source-Level Introspection**: Uses [`_get_tango_device_class()`](../asyncroscopy/mcp/mcp_server.py#L294) to search specified Python packages (default: `["asyncroscopy"]`) and `inspect` to retrieve real parameter names via [`_get_param_name()`](../asyncroscopy/mcp/mcp_server.py#L372) and docstrings via [`_get_docstring()`](../asyncroscopy/mcp/mcp_server.py#L330) from the source implementation. -- Handles `DevEncoded` data by base64-encoding the payload for [JSON-safe transport](#data-transport--encoding) — see [`_normalize_command_result()`](../asyncroscopy/mcp/mcp_server.py#L264). +- Handles `DevEncoded` data by base64-encoding the payload for [JSON-safe transport](#data-transport-encoding) — see [`_normalize_command_result()`](../asyncroscopy/mcp/mcp_server.py#L264). --- @@ -79,6 +99,7 @@ class MyCustomMCPServer(MCPServer): --- +(data-transport-encoding)= ## Data Transport & Encoding Tango `DevEncoded` commands often return binary data (like images). The [`_normalize_command_result()`](../asyncroscopy/mcp/mcp_server.py#L264) method normalizes these into a standard JSON structure: diff --git a/docs/modify_base_microscope.md b/docs/Microscopy/modify_base_microscope.md similarity index 100% rename from docs/modify_base_microscope.md rename to docs/Microscopy/modify_base_microscope.md diff --git a/docs/modify_thermo_microscope.md b/docs/Microscopy/modify_thermo_microscope.md similarity index 100% rename from docs/modify_thermo_microscope.md rename to docs/Microscopy/modify_thermo_microscope.md diff --git a/scripts/README_db_mode.md b/docs/Operation/tango_db_mode.md similarity index 99% rename from scripts/README_db_mode.md rename to docs/Operation/tango_db_mode.md index 6ea32a8..16e46ae 100644 --- a/scripts/README_db_mode.md +++ b/docs/Operation/tango_db_mode.md @@ -1,4 +1,5 @@ +## Tango Database Mode ``` workflow: diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..013cfb8 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,22 @@ +# Asyncroscopy Documentation + +Welcome to the Asyncroscopy documentation site. + +![Schematic of the functional project structure](../architecture.png) + +Use this site to navigate contributor guidance, microscope architecture notes, hardware extension docs, MCP server references, and upcoming changes. + +## Start Here + +- [Contributing Guide](dev_guide.md): project engineering principles and pull request expectations. +- [Base Microscope Extension Notes](Microscopy/modify_base_microscope.md): where to add or change core microscope behavior. +- [Thermo Microscope Extension Notes](Microscopy/modify_thermo_microscope.md): detector integration and orchestration guidance. + +## Hardware and Integrations + +- [Add a Detector](Adding_New_Hardware/add_detector.md): detector onboarding checklist and implementation notes. +- [MCP Server Documentation](MCP/mcp_server.md): how Tango commands are exposed to MCP-compatible agents. + +## Roadmap + +- [Upcoming Changes](upcoming_changes.md): planned areas and deferred work. diff --git a/docs/myst.yml b/docs/myst.yml new file mode 100644 index 0000000..82974dc --- /dev/null +++ b/docs/myst.yml @@ -0,0 +1,51 @@ +# See docs at: https://mystmd.org/guide/frontmatter +version: 1 +project: + id: 00c581f9-e9f0-4bb6-94fa-773c880a9953 + title: Asyncroscopy Documentation + description: Coordination platform for asynchronous microscopy and spectroscopy data analysis + keywords: [imaging, microscopy, spectroscopy, TEM] + authors: + - name: Austin Houston + email: ahoust17@vols.utk.edu + - name: Utkarsh Pratiush + email: upratius@vols.utk.edu + - name: Dominick Pelaia + email: dpelaia@utk.edu + - name: Levi Dunn + email: ldunn21@utk.edu + - name: Gerd Duscher + email: gduscher@utk.edu + github: https://github.com/whittlegears/asyncroscopy + toc: + - file: index.md + - title: Contributor Guides + children: + - file: dev_guide.md + - title: Microscopy + children: + - file: Microscopy/modify_base_microscope.md + - file: Microscopy/modify_thermo_microscope.md + - title: Adding New Hardware + children: + - file: Adding_New_Hardware/add_detector.md + - title: MCP Server + children: + - file: MCP/mcp_server.md + - file: MCP/building_an_mcp.md + - file: MCP/asyncroscopy_mcp.md + - title: Operation + children: + - file: Operation/tango_db_mode.md + - file: upcoming_changes.md + +site: + template: book-theme + nav: [] + options: + logo: null + 404: 404.md + +publish: + - id: asyncroscopy + url: https://whittlegears.mystmd.org \ No newline at end of file diff --git a/docs/thermo_digital_twin.md b/docs/thermo_digital_twin.md new file mode 100644 index 0000000..526db25 --- /dev/null +++ b/docs/thermo_digital_twin.md @@ -0,0 +1,41 @@ +# ThermoDigitalTwin + +`ThermoDigitalTwin` is the simulated version of the `ThermoMicroscope`. +It provides realistic-enough image and spectrum behavior for development, testing, and demos without requiring AutoScript or hardware. + +## How it works + +1. On startup, the twin generates a **persistent synthetic sample** (deterministic from seed). +2. Stage pose (`x, y, z, alpha, beta`) defines the current viewport into that sample. +3. `get_scanned_image()` renders an image from the current pose and FoV. +4. `get_spectrum("eds")` estimates composition at the current beam position using the same projected sample state. + +This means moving the stage navigates the sample, and revisiting the same pose can reproduce the same view when stage noise is disabled. + +## Available features + +- Persistent sample per device session +- Deterministic sample generation via seed +- Stage-coupled navigation in **XY + Z + alpha/beta tilt** +- Beam-position-dependent spectrum simulation +- Configurable stage move noise +- Viewport metadata reporting +- Manual sample regeneration with a new seed + +## Key properties + +- `sample_seed`: controls deterministic sample generation +- `sample_particle_count`: controls synthetic particle count +- `sample_extent_scale`: controls sample XY size relative to FoV +- `stage_move_noise_std`: adds Gaussian perturbation to stage moves + +## Key commands + +- `move_stage([x, y, z, alpha, beta])` +- `get_stage()` +- `set_fov(fov)` +- `get_scanned_image()` +- `place_beam([x, y])` +- `get_spectrum("eds")` +- `get_viewport_metadata()` +- `regenerate_sample(seed)` \ No newline at end of file diff --git a/docs/upcoming_changes.md b/docs/upcoming_changes.md index fc1443b..a571792 100644 --- a/docs/upcoming_changes.md +++ b/docs/upcoming_changes.md @@ -1,4 +1,6 @@ -## Not yet implemented +## Future Changes - **EELS, EDS, CEOS** detector device files are present as stubs. -- **Async acquisition** — deferred; architecture is designed to adopt gevent-based async later without structural changes. \ No newline at end of file +- **Async acquisition** — deferred; architecture is designed to adopt gevent-based async later without structural changes. +- **Create benchmarking for model +- **Test UNET Blob Finder and integrate ML as a device to call diff --git a/notebooks/9_MCP_Server_Tutorial.ipynb b/notebooks/9_MCP_Server_Tutorial.ipynb index 2fb00df..aed6cfe 100644 --- a/notebooks/9_MCP_Server_Tutorial.ipynb +++ b/notebooks/9_MCP_Server_Tutorial.ipynb @@ -16,7 +16,7 @@ "metadata": {}, "source": [ "> [!IMPORTANT]\n", - "> **Note:** This notebook is purely for educational demonstration. In practice, the MCP server should always be run as a standalone process via the CLI (such as through [`start_mcp_with_twin.py`](../scripts/start_mcp_with_twin.py) or [`start_mcp_server_cli.py`](../scripts/start_mcp_server_cli.py)). Running it here blocks the notebook and can lead to event-loop conflicts." + " \"> **Note:** This notebook is purely for educational demonstration. In practice, the MCP server should always be run as a standalone process via the CLI (such as through [`run_mcp_and_devices.py`](../scripts/run_mcp_and_devices.py) or [`start_mcp_server_cli.py`](../scripts/start_mcp_server_cli.py)). Running it here blocks the notebook and can lead to event-loop conflicts.\"" ] }, { diff --git a/pyproject.toml b/pyproject.toml index eb6bc09..c32fab3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "asyncroscopy" version = "0.1.0" description = "Asyncroscopy: asynchronous control framework for STEM automation" readme = "README.md" -requires-python = ">3.11" +requires-python = ">=3.12" dependencies = [ "colorama>=0.4.6", "defusedxml>=0.7.1", @@ -33,6 +33,7 @@ dependencies = [ "autoscript-tem-toolkit", "thermoscientific-logging", "aicspylibczi>=3.3.1", + "jupyter-book>=2.0.0" ] [tool.setuptools] diff --git a/scripts/run_mcp_and_devices.py b/scripts/run_mcp_and_devices.py new file mode 100755 index 0000000..0ecca6c --- /dev/null +++ b/scripts/run_mcp_and_devices.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python +""" +Interactive CLI to start a Tango DB, register and run specified Tango devices, +and then start the MCP server. +""" + +from __future__ import annotations + +import os +import socket +import subprocess +import sys +import tempfile +import time +import importlib +import contextlib +from typing import Callable +from pathlib import Path + +from tango import Database, DbDevInfo, DeviceProxy +from tango.server import device_property + +# Add the parent directory to Python path to allow asyncroscopy imports +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from asyncroscopy.mcp.mcp_server import MCPServer + +class ManagedProcess: + def __init__(self, name: str, process: subprocess.Popen[str]): + self.name = name + self.process = process + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + stop_process(self) + +def log_stderr(msg: str) -> None: + """Log to stderr to avoid corrupting MCP stdout JSON-RPC.""" + print(msg, file=sys.stderr, flush=True) + +def find_free_port(host: str = "127.0.0.1") -> tuple[int, socket.socket]: + """Find a free port and return (port, socket).""" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, 0)) + return int(sock.getsockname()[1]), sock + +def make_env(tango_host: str) -> dict[str, str]: + env = os.environ.copy() + env["TANGO_HOST"] = tango_host + env["PYTHONUNBUFFERED"] = "1" + return env + +def retry_until_success[T](func: Callable[[], T], timeout: float, error_msg: str) -> T: + start = time.monotonic() + last_error = None + while time.monotonic() - start < timeout: + try: + return func() + except Exception as exc: + last_error = exc + time.sleep(0.1) + raise TimeoutError(f"{error_msg} Last error: {last_error}") + +def wait_for_process_output( + proc: subprocess.Popen[str], + expected_text: str, + timeout: float, + process_name: str, +) -> None: + start = time.monotonic() + seen_lines = [] + + while time.monotonic() - start < timeout: + if proc.poll() is not None: + output = "\n".join(seen_lines) + raise RuntimeError( + f"{process_name} exited early with code {proc.returncode}.\n" + f"Observed output:\n{output}" + ) + + line = proc.stdout.readline() if proc.stdout else "" + if line: + line = line.rstrip("\n") + seen_lines.append(line) + log_stderr(f"[{process_name}] {line}") + if expected_text in line: + return + else: + time.sleep(0.05) + + output = "\n".join(seen_lines) + raise TimeoutError( + f"Timed out waiting for '{expected_text}' from {process_name}.\n" + f"Observed output:\n{output}" + ) + +def wait_for_device_ready(device_name: str, timeout: float = 10.0) -> None: + def check(): + dev = DeviceProxy(device_name) + dev.ping() + retry_until_success(check, timeout, f"Timed out waiting for device '{device_name}' readiness.") + +def connect_database(host: str, port: int, timeout: float = 10.0) -> Database: + def check(): + db = Database(host, port) + db.get_db_host() + return db + return retry_until_success(check, timeout, f"Timed out connecting to Tango DB at {host}:{port}.") + +def stop_process(managed: ManagedProcess, timeout: float = 5.0) -> None: + proc = managed.process + if proc.poll() is not None: + return + + log_stderr(f"[shutdown] terminating {managed.name} (pid={proc.pid})") + proc.terminate() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + log_stderr(f"[shutdown] killing {managed.name} (pid={proc.pid})") + proc.kill() + proc.wait(timeout=timeout) + +def start_background_process(name: str, args: list[str], env: dict[str, str], expected_text: str, timeout: float, cwd: Path | None = None) -> ManagedProcess: + log_stderr(f"[startup] Starting {name}...") + proc = subprocess.Popen( + args, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + managed = ManagedProcess(name=name, process=proc) + try: + wait_for_process_output(proc, expected_text, timeout, name) + return managed + except Exception: + stop_process(managed) + raise + +def get_class_from_name(class_name: str): + """Dynamically find a Tango Device class in the asyncroscopy package.""" + module_paths_to_try = [ + f"asyncroscopy.{class_name}", + f"asyncroscopy.hardware.{class_name}", + f"asyncroscopy.detectors.{class_name}" + ] + + for mod_path in module_paths_to_try: + try: + module = importlib.import_module(mod_path) + if hasattr(module, class_name): + return getattr(module, class_name) + except ImportError: + continue + + raise ValueError(f"Could not find class {class_name} in asyncroscopy") + +def add_device(db: Database, server: str, classname: str, device: str): + info = DbDevInfo() + info.server = server + info._class = classname + info.name = device + db.add_device(info) + print(f"Registered '{device}' (Server: {server}, Class: {classname})") + +def get_required_subdevices(class_name: str) -> list[dict[str, str]]: + """Parses the class device properties to find sub-devices.""" + cls = get_class_from_name(class_name) + sub_devices = [] + for attr_name in dir(cls): + if attr_name.endswith("_device_address"): + prop = getattr(cls, attr_name) + if isinstance(prop, device_property): + prefix = attr_name.split("_device_address")[0] + sub_devices.append({ + "class": prefix.upper(), + "attr_name": attr_name, + "prefix": prefix.lower() + }) + return sub_devices + +def cleanup_old_servers_for_class(class_name: str) -> None: + """ + Cleanup of Tango servers related to a device class. + Only runs if TANGO_HOST is already in the environment. + """ + if "TANGO_HOST" not in os.environ: + log_stderr(f"[startup] No TANGO_HOST set; skipping stale-server cleanup (no old DB to query)") + return + + try: + db = Database() + related_classes = {class_name} + + try: + for sub in get_required_subdevices(class_name): + related_classes.add(sub["class"]) + except Exception as exc: + log_stderr(f"[startup] Could not inspect related device classes for {class_name}: {exc}") + + for related_class in sorted(related_classes): + servers = list(db.get_server_list(f"{related_class}/*")) + log_stderr(f"[startup] Existing {related_class} servers: {servers}") + + for server in servers: + try: + dserver_name = f"dserver/{server}" + log_stderr(f"[startup] Killing stale server via {dserver_name}") + dserver = DeviceProxy(dserver_name) + dserver.command_inout("Kill") + except Exception as exc: + log_stderr(f"[startup] Failed to kill {server}: {exc}") + except Exception as exc: + log_stderr(f"[startup] Skipping stale-server cleanup: {exc}") + +def main(): + host = "127.0.0.1" + python_bin = sys.executable + port_socket: socket.socket | None = None + + try: + class_name = input("Enter the name of the main class to register (e.g., 'ThermoMicroscope'): ") + + cleanup_old_servers_for_class(class_name) + + port, port_socket = find_free_port(host) + tango_host = f"{host}:{port}" + + print(f"[config] TANGO_HOST={tango_host}") + os.environ["TANGO_HOST"] = tango_host + + env = make_env(tango_host) + + with contextlib.ExitStack() as stack: + db_dir_obj = stack.enter_context(tempfile.TemporaryDirectory(prefix="tango-db-run-")) + db_path = Path(db_dir_obj) + + # Start Tango DB + db_proc = start_background_process( + name="tango-db", + args=[python_bin, "-m", "tango.databaseds.database", "2"], + env=env, + expected_text="Ready to accept request", + timeout=30.0, + cwd=db_path + ) + stack.enter_context(db_proc) + + # Tango DB is now running and bound to the port; we can release the port-finder socket + if port_socket is not None: + port_socket.close() + port_socket = None + + db = connect_database(host, port) + device_name = f"test/{class_name.lower()}/1" + server_name = f"{class_name}/{class_name.lower()}_instance" + + # Setup main device + add_device(db, server_name, class_name, device_name) + + # Setup and Start Sub-devices + sub_devices = get_required_subdevices(class_name) + for sub in sub_devices: + sub_classname = sub["class"] + sub_device = f"test/{sub['prefix']}/1" + sub_server = f"{sub_classname}/{sub['prefix']}_instance" + + # Register the sub-device and link it to the main device + add_device(db, sub_server, sub_classname, sub_device) + db.put_device_property(device_name, {sub['attr_name']: [sub_device]}) + print(f" property: {sub['attr_name']} = {sub_device}") + + # Start sub-device server + cls = get_class_from_name(sub_classname) + proc = start_background_process( + name=f"device-{cls.__module__.split('.')[-1]}", + args=[python_bin, "-m", cls.__module__, f"{sub['prefix']}_instance"], + env=env, + expected_text="Ready to accept request", + timeout=30.0 + ) + stack.enter_context(proc) + wait_for_device_ready(sub_device, timeout=10.0) + log_stderr(f"[startup] {sub_classname} device is fully accessible") + + # Start Main Device + main_cls = get_class_from_name(class_name) + main_proc = start_background_process( + name=f"device-{main_cls.__module__.split('.')[-1]}", + args=[python_bin, "-m", main_cls.__module__, f"{class_name.lower()}_instance"], + env=env, + expected_text="Ready to accept request", + timeout=30.0 + ) + stack.enter_context(main_proc) + + wait_for_device_ready(device_name, timeout=10.0) + log_stderr(f"[startup] Main {class_name} device is fully accessible") + + # Start MCPServer + log_stderr("[startup] Initializing MCP Server...") + server = MCPServer( + name=f"MCPServer_{class_name}", + tango_host=host, + tango_port=port, + blocked_classes=["DataBase", "DServer"], + verbose=False, + ) + + mcp_host = input("Enter MCP server host (default: 127.0.0.1): ").strip() or "127.0.0.1" + mcp_port_input = input("Enter MCP server port (default: 8000): ").strip() + mcp_port = int(mcp_port_input) if mcp_port_input else 8000 + + log_stderr(f"[startup] Starting MCP Server at {mcp_host}:{mcp_port}. Exported devices: {server.list_devices()}") + server.start_http(host=mcp_host, port=mcp_port) + + except KeyboardInterrupt: + log_stderr("\n[shutdown] KeyboardInterrupt received. Shutting down...") + except Exception as exc: + log_stderr(f"\n[error] Fatal error: {exc}") + sys.exit(1) + finally: + if port_socket is not None: + port_socket.close() + +if __name__ == "__main__": + main() diff --git a/scripts/start_mcp_with_twin.py b/scripts/start_mcp_with_twin.py deleted file mode 100644 index 64512a9..0000000 --- a/scripts/start_mcp_with_twin.py +++ /dev/null @@ -1,282 +0,0 @@ -#!/usr/bin/env python -""" -start_mcp_with_twin.py - -Starts the Tango DB, registers and runs the ThermoDigitalTwin, -and then starts the MCP server. This is useful for running the -MCP server along with a mock twin in a single command. -""" - -from __future__ import annotations - -import os -import socket -import subprocess -import sys -import tempfile -import time -from pathlib import Path - -import tango - -sys.path.insert(0, str(Path(__file__).resolve().parents[1])) - -from asyncroscopy.mcp.mcp_server import MCPServer - -class ManagedProcess: - def __init__(self, name: str, process: subprocess.Popen[str]): - self.name = name - self.process = process - -def log_stderr(msg: str) -> None: - """Log to stderr to avoid corrupting MCP stdout JSON-RPC.""" - print(msg, file=sys.stderr, flush=True) - -def find_free_port(host: str = "127.0.0.1") -> int: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.bind((host, 0)) - return int(sock.getsockname()[1]) - -def make_env(tango_host: str) -> dict[str, str]: - env = os.environ.copy() - env["TANGO_HOST"] = tango_host - env["PYTHONUNBUFFERED"] = "1" - return env - -def wait_for_process_output( - proc: subprocess.Popen[str], - expected_text: str, - timeout: float, - process_name: str, -) -> None: - start = time.monotonic() - seen_lines = [] - - while time.monotonic() - start < timeout: - if proc.poll() is not None: - output = "\n".join(seen_lines) - raise RuntimeError( - f"{process_name} exited early with code {proc.returncode}.\n" - f"Observed output:\n{output}" - ) - - line = proc.stdout.readline() if proc.stdout else "" - if line: - line = line.rstrip("\n") - seen_lines.append(line) - log_stderr(f"[{process_name}] {line}") - if expected_text in line: - return - else: - time.sleep(0.05) - - output = "\n".join(seen_lines) - raise TimeoutError( - f"Timed out waiting for '{expected_text}' from {process_name}.\n" - f"Observed output:\n{output}" - ) - -def wait_for_device_ready(device_name: str, timeout: float = 10.0) -> None: - start = time.monotonic() - last_error = None - - while time.monotonic() - start < timeout: - try: - dev = tango.DeviceProxy(device_name) - dev.ping() - return - except Exception as exc: - last_error = exc - time.sleep(0.1) - - raise TimeoutError( - f"Timed out waiting for device '{device_name}' readiness. " - f"Last error: {last_error}" - ) - -def start_tango_db(python_bin: str, tango_host: str, work_dir: Path, timeout: float) -> ManagedProcess: - log_stderr("[startup] Starting Tango DB...") - env = make_env(tango_host) - proc = subprocess.Popen( - [python_bin, "-m", "tango.databaseds.database", "2"], - cwd=work_dir, - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - ) - managed = ManagedProcess(name="tango-db", process=proc) - wait_for_process_output(proc, "Ready to accept request", timeout, managed.name) - return managed - -def register_device( - db_host: str, - db_port: int, - server_name: str, - class_name: str, - device_name: str, -) -> None: - db = tango.Database(db_host, db_port) - info = tango.DbDevInfo() - info.server = server_name - info._class = class_name - info.name = device_name - - try: - db.add_device(info) - log_stderr(f"[register] registered: {device_name}") - except tango.DevFailed: - log_stderr(f"[register] device already present: {device_name}") - -def set_twin_properties( - db_host: str, - db_port: int, - twin_device_name: str, - scan_device_name: str, - eds_device_name: str, -) -> None: - db = tango.Database(db_host, db_port) - db.put_device_property( - twin_device_name, - { - "scan_device_address": [scan_device_name], - "eds_device_address": [eds_device_name], - }, - ) - log_stderr(f"[register] property set: {twin_device_name}.scan_device_address={scan_device_name}") - log_stderr(f"[register] property set: {twin_device_name}.eds_device_address={eds_device_name}") - -def start_scan_device(python_bin: str, tango_host: str, instance: str, timeout: float) -> ManagedProcess: - log_stderr("[startup] Starting SCAN device server...") - env = make_env(tango_host) - proc = subprocess.Popen( - [python_bin, "-m", "asyncroscopy.hardware.SCAN", instance], - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - ) - managed = ManagedProcess(name="scan-device", process=proc) - wait_for_process_output(proc, "Ready to accept request", timeout, managed.name) - return managed - -def start_eds_device(python_bin: str, tango_host: str, instance: str, timeout: float) -> ManagedProcess: - log_stderr("[startup] Starting EDS device server...") - env = make_env(tango_host) - proc = subprocess.Popen( - [python_bin, "-m", "asyncroscopy.detectors.EDS", instance], - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - ) - managed = ManagedProcess(name="eds-device", process=proc) - wait_for_process_output(proc, "Ready to accept request", timeout, managed.name) - return managed - -def start_digital_twin(python_bin: str, tango_host: str, instance: str, timeout: float) -> ManagedProcess: - log_stderr("[startup] Starting ThermoDigitalTwin...") - env = make_env(tango_host) - proc = subprocess.Popen( - [python_bin, "-m", "asyncroscopy.ThermoDigitalTwin", instance], - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - ) - managed = ManagedProcess(name="digital-twin", process=proc) - wait_for_process_output(proc, "Ready to accept request", timeout, managed.name) - return managed - -def stop_process(managed: ManagedProcess, timeout: float = 5.0) -> None: - proc = managed.process - if proc.poll() is not None: - return - - log_stderr(f"[shutdown] terminating {managed.name} (pid={proc.pid})") - proc.terminate() - try: - proc.wait(timeout=timeout) - except subprocess.TimeoutExpired: - log_stderr(f"[shutdown] killing {managed.name} (pid={proc.pid})") - proc.kill() - proc.wait(timeout=timeout) - -def main(): - host = "127.0.0.1" - port = find_free_port(host) - tango_host = f"{host}:{port}" - python_bin = sys.executable - - log_stderr(f"[config] TANGO_HOST={tango_host}") - os.environ["TANGO_HOST"] = tango_host - - managed_procs = [] - db_dir_obj = tempfile.TemporaryDirectory(prefix="tango-db-run-") - db_path = Path(db_dir_obj.name) - - try: - # Start Tango DB - db_proc = start_tango_db(python_bin, tango_host, db_path, timeout=30.0) - managed_procs.append(db_proc) - - # Register SCAN + EDS + Twin devices and set required twin properties - twin_instance = "mcp_instance" - twin_device_name = "test/digitaltwin/1" - scan_instance = "scan_instance" - scan_device_name = "test/scan/1" - eds_instance = "eds_instance" - eds_device_name = "test/eds/1" - register_device(host, port, f"SCAN/{scan_instance}", "SCAN", scan_device_name) - register_device(host, port, f"EDS/{eds_instance}", "EDS", eds_device_name) - register_device(host, port, f"ThermoDigitalTwin/{twin_instance}", "ThermoDigitalTwin", twin_device_name) - set_twin_properties(host, port, twin_device_name, scan_device_name, eds_device_name) - - # Start detector devices first so twin can resolve its proxies at init - scan_proc = start_scan_device(python_bin, tango_host, scan_instance, timeout=30.0) - managed_procs.append(scan_proc) - wait_for_device_ready(scan_device_name, timeout=10.0) - log_stderr("[startup] SCAN device is fully accessible") - - eds_proc = start_eds_device(python_bin, tango_host, eds_instance, timeout=30.0) - managed_procs.append(eds_proc) - wait_for_device_ready(eds_device_name, timeout=10.0) - log_stderr("[startup] EDS device is fully accessible") - - # Start Twin - twin_proc = start_digital_twin(python_bin, tango_host, twin_instance, timeout=30.0) - managed_procs.append(twin_proc) - - # Wait for Twin to be fully responsive - wait_for_device_ready(twin_device_name, timeout=10.0) - log_stderr("[startup] ThermoDigitalTwin is fully accessible") - - # Start MCPServer - log_stderr("[startup] Initializing MCP Server...") - server = MCPServer( - name="MCPServer_Twin", - tango_host=host, - tango_port=port, - blocked_classes=["DataBase", "DServer"], - verbose=False, - ) - - log_stderr(f"[startup] Starting MCP Server. Exported devices: {server.list_devices()}") - server.start_http() - - except KeyboardInterrupt: - log_stderr("\n[shutdown] KeyboardInterrupt received. Shutting down...") - except Exception as exc: - log_stderr(f"\n[error] Fatal error: {exc}") - sys.exit(1) - finally: - for proc in reversed(managed_procs): - stop_process(proc) - db_dir_obj.cleanup() - -if __name__ == "__main__": - main() diff --git a/tests/conftest.py b/tests/conftest.py index 8725085..8c1f940 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,12 +13,13 @@ import numpy as np import pytest -import os import tango from tango.test_context import MultiDeviceTestContext # Import device classes to test from asyncroscopy.hardware.SCAN import SCAN +from asyncroscopy.hardware.STAGE import STAGE +from asyncroscopy.detectors.EDS import EDS from asyncroscopy.ThermoDigitalTwin import ThermoDigitalTwin from asyncroscopy.ThermoMicroscope import ThermoMicroscope @@ -45,6 +46,24 @@ def tango_ctx(): } ], }, + { + "class": EDS, + "devices": [ + { + "name": "test/nodb/eds", + "properties": {}, + } + ], + }, + { + "class": STAGE, + "devices": [ + { + "name": "test/nodb/stage", + "properties": {}, + } + ], + }, { "class": ThermoDigitalTwin, "devices": [ @@ -52,6 +71,8 @@ def tango_ctx(): "name": "test/nodb/twin", "properties": { "scan_device_address": "test/nodb/scan", + "eds_device_address": "test/nodb/eds", + "stage_device_address": "test/nodb/stage", }, } ], @@ -90,6 +111,16 @@ def twin_proxy(tango_ctx): return tango.DeviceProxy(tango_ctx.get_device_access("test/nodb/twin")) +@pytest.fixture(scope="session") +def eds_proxy(tango_ctx): + return tango.DeviceProxy(tango_ctx.get_device_access("test/nodb/eds")) + + +@pytest.fixture(scope="session") +def stage_proxy(tango_ctx): + return tango.DeviceProxy(tango_ctx.get_device_access("test/nodb/stage")) + + @pytest.fixture(scope="session") def thermo_proxy(tango_ctx): return tango.DeviceProxy(tango_ctx.get_device_access("test/nodb/thermomicroscope")) @@ -116,4 +147,4 @@ def fake_acquire(self, imsize: int, dwell_time: float, detector_list: list): ThermoDigitalTwin, "_acquire_stem_image", fake_acquire, - ) \ No newline at end of file + ) diff --git a/tests/test_digital_twin.py b/tests/test_digital_twin.py index 8ec65e7..3939355 100644 --- a/tests/test_digital_twin.py +++ b/tests/test_digital_twin.py @@ -35,3 +35,38 @@ def test_get_image_returns_valid_data(self, twin_proxy: tango.DeviceProxy, patch def test_unknown_detector_raises(self, twin_proxy: tango.DeviceProxy): with pytest.raises(tango.DevFailed): twin_proxy.get_spectrum("void") + + def test_stage_navigation_changes_and_restores_view( + self, + twin_proxy: tango.DeviceProxy, + scan_proxy: tango.DeviceProxy, + ): + scan_proxy.imsize = 64 + scan_proxy.dwell_time = 1e-6 + + twin_proxy.move_stage([0.0, 0.0, 0.0, 0.0, 0.0]) + _, raw_a = twin_proxy.get_scanned_image() + + twin_proxy.move_stage([8e-9, -7e-9, 0.0, 0.0, 0.0]) + _, raw_b = twin_proxy.get_scanned_image() + assert raw_a != raw_b + + twin_proxy.move_stage([0.0, 0.0, 0.0, 0.0, 0.0]) + _, raw_a_again = twin_proxy.get_scanned_image() + assert raw_a == raw_a_again + + def test_spectrum_is_repeatable_at_same_pose_and_beam( + self, + twin_proxy: tango.DeviceProxy, + eds_proxy: tango.DeviceProxy, + ): + eds_proxy.exposure_time = 0.05 + twin_proxy.move_stage([0.0, 0.0, 0.0, 0.0, 0.0]) + twin_proxy.place_beam([0.45, 0.55]) + + _meta_1, raw_1 = twin_proxy.get_spectrum("eds") + _meta_2, raw_2 = twin_proxy.get_spectrum("eds") + + spec_1 = json.loads(raw_1.decode("utf-8")) + spec_2 = json.loads(raw_2.decode("utf-8")) + assert spec_1 == spec_2 diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 7bcb90d..fd725e1 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -374,7 +374,7 @@ def test_devencoded_payload_is_json_safe(self) -> None: assert isinstance(normalized, dict) assert normalized["encoding"] == "base64" - assert normalized["metadata"] == '{"shape":[2,2],"dtype":"uint8"}' + assert normalized["metadata"] == {"shape": [2, 2], "dtype": "uint8"} assert isinstance(normalized["payload"], str) assert base64.b64decode(normalized["payload"]) == b"\x00\x01\xff\x10" @@ -383,4 +383,51 @@ def test_tango_types_map_to_python(self) -> None: assert MCPServer._tango_type_to_python(tango.CmdArgType.DevString) is str assert MCPServer._tango_type_to_python(tango.CmdArgType.DevVarDoubleArray) == list[float] assert MCPServer._tango_type_to_python(tango.CmdArgType.DevUChar) == np.uint8 - assert MCPServer._tango_type_to_python(tango.CmdArgType.DevEncoded) is dict \ No newline at end of file + assert MCPServer._tango_type_to_python(tango.CmdArgType.DevEncoded) is dict + +class TestMCPToolInvocation: + def test_wrapper_supports_positional_and_keyword(self, monkeypatch) -> None: + # Mock Database and DeviceProxy to avoid connection errors + # Must patch where it is used (imported) + monkeypatch.setattr("asyncroscopy.mcp.mcp_server.Database", lambda host, port: None) + + # Mock objects for wrapping + def mock_func(val): + return val + + cmd_info = type('CommandInfo', (), { + 'in_type': tango.CmdArgType.DevString, + 'out_type': tango.CmdArgType.DevString, + 'in_type_desc': 'some string', + 'out_type_desc': 'result' + }) + + server = MCPServer("test", "localhost", 1234) + wrapper = server._create_wrapper(mock_func, cmd_info, "MyCmd", "MyClass") + + # 1. Positional call + assert wrapper("hello") == "hello" + + # 2. Keyword call with correct name + import inspect + sig = inspect.signature(wrapper) + param_name = list(sig.parameters.keys())[0] + assert wrapper(**{param_name: "world"}) == "world" + + def test_void_wrapper_supports_no_args(self, monkeypatch) -> None: + monkeypatch.setattr("asyncroscopy.mcp.mcp_server.Database", lambda host, port: None) + + def mock_func(): + return "done" + + cmd_info = type('CommandInfo', (), { + 'in_type': tango.CmdArgType.DevVoid, + 'out_type': tango.CmdArgType.DevString, + 'in_type_desc': '', + 'out_type_desc': '' + }) + + server = MCPServer("test", "localhost", 1234) + wrapper = server._create_wrapper(mock_func, cmd_info, "VoidCmd", "MyClass") + + assert wrapper() == "done" \ No newline at end of file