Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,6 @@ PythonPackages-AS-1.15/
llm-context/
# MyST build outputs
_build

extra-notebooks-debug/
tango_database.db
8 changes: 8 additions & 0 deletions asyncroscopy/Microscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import time
from typing import Optional


from abc import abstractmethod, ABC, ABCMeta

import numpy as np
Expand Down Expand Up @@ -50,6 +51,13 @@ class Microscope(Device, metaclass=CombinedMeta):
"DB mode: 'test/detector/scan' "
"No-DB mode: 'tango://127.0.0.1:8888/test/nodb/scan#dbase=no'",
)

corrector_device_address = device_property(
dtype=str,
doc="Tango device address for the aberration corrector settings device. "
"DB mode: 'test/hardware/corrector' "
"No-DB mode: 'tango://127.0.0.1:8888/test/nodb/corrector#dbase=no'",
)

eds_device_address = device_property(
dtype=str,
Expand Down
117 changes: 113 additions & 4 deletions asyncroscopy/ThermoDigitalTwin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,36 @@
"""

import json
from pathlib import Path
import time

import numpy as np
import pyTEMlib.image_tools as it
import pyTEMlib.probe_tools as pt
import tango
from ase.io import read
from ase import Atoms
from ase.build import bulk
from tango import AttrWriteType, DevState
from tango.server import Device, attribute, device_property
from tango import AttrWriteType, DevState, DevEncoded
from tango.server import Device, attribute, device_property, command

from asyncroscopy.Microscope import Microscope
from asyncroscopy.simulation import StemSim as dg # dg means datagenerator --> needs better naming -> using directly as used in twisted version for now


HERE = Path(__file__).resolve().parent
PROJECT_ROOT = HERE.parent # removes "servers"


class ThermoDigitalTwin(Microscope):
"""
Persistent ASE-backed sample simulation with stage-coupled viewport rendering.
"""

# ------------------------------------------------------------------
# Device properties — configure in Tango DB per deployment
# ------------------------------------------------------------------

sample_seed = device_property(
dtype=int,
default_value=12345,
Expand Down Expand Up @@ -109,6 +121,7 @@ def _connect_detector_proxies(self) -> None:
"eds": self.eds_device_address,
"stage": self.stage_device_address,
"scan": self.scan_device_address,
"corrector": self.corrector_device_address
}
for name, address in addresses.items():
if not address:
Expand Down Expand Up @@ -178,7 +191,9 @@ def _sub_pix_gaussian(size: int = 11, sigma: float = 0.8, dx: float = 0.0, dy: f
g = np.exp(-(((xx + dx) ** 2 + (yy + dy) ** 2) / (2 * sigma**2)))
m = np.max(g)
return g / m if m > 0 else g

# ------------------------------------------------------------------
# simulation helpers ----> Should be put in asyncroscopy/simulation later
# ------------------------------------------------------------------
def _create_pseudo_potential(
self,
xtal: Atoms,
Expand Down Expand Up @@ -397,6 +412,9 @@ def _generate_sample(self, seed: int) -> None:
self._cached_pose_key = None
self._update_view_cache(force=True)

# ------------------------------------------------------------------
# Attribute read methods
# ------------------------------------------------------------------
def read_manufacturer(self) -> str:
"""Read method for the manufacturer attribute."""
return self._manufacturer
Expand All @@ -413,6 +431,57 @@ def write_beam_pos(self, value):
self._beam_pos_x = float(x)
self._beam_pos_y = float(y)


# ------------------------------------------------------------------
# Commands
# ------------------------------------------------------------------

@command(dtype_out=DevEncoded)#In PyTango, DevEncoded is a special Tango data type designed to send binary data + a small description string together as a single return value.
def get_scanned_image_with_aberrations(self) -> tuple[str, bytes]:
"""
Acquire a single STEM image from the named detector.

Parameters
----------
detector_name:
Name of the detector, e.g. "haadf". Must match a key in
self._detector_proxies.

Returns
-------
DevEncoded = (json_metadata, raw_bytes)
json_metadata includes: shape, dtype, dwell_time, detector,
timestamp, and any other relevant metadata.
raw_bytes is the flat numpy array bytes; reshape using shape from metadata.
"""
# check active detectors
scan = self._detector_proxies.get("scan")
corrector = self._detector_proxies.get("corrector")


# Read scan settings from the detector device
dwell_time=scan.dwell_time
imsize=scan.imsize

# Read aberration setting from the corrector
ab = corrector.get_aberrations_coeff_sim()# is a json string
self.ab = json.loads(ab)

adorned_image = self._acquire_stem_image_aberrations(imsize, dwell_time, ['haadf'])

metadata = {
"detector": 'haadf',
"shape": [imsize, imsize],
"dtype": str(adorned_image.dtype),
"dwell_time": dwell_time,
"timestamp": time.time(),
# TODO: add metadata from adorned_image.metadata when using real AutoScript
}

return json.dumps(metadata), adorned_image.tobytes()
# ------------------------------------------------------------------
# Internal acquisition helpers
# ------------------------------------------------------------------
def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: list) -> np.ndarray:
"""Simulate STEM image acquisition using convolutions of the pseudo-potential and electron probe."""
self._sync_stage_from_proxy()
Expand Down Expand Up @@ -457,6 +526,46 @@ def _acquire_stem_image(self, imsize: int, dwell_time: float, detector_list: lis
noisy_image += self._lowfreq_noise(noisy_image, noise_level=0.1, freq_scale=0.1, rng=rng) * blur_noise_level
return np.clip(noisy_image, 0.0, 1.0).astype(np.float32)


def _acquire_stem_image_aberrations(self, imsize: int, dwell_time: float, detector_list: list) -> np.ndarray:
"""
"""
size = imsize
beam_current = 100 # pA
ab = self.ab
ab['acceleration_voltage'] = 60e3 # eV
fov = 96 # angstroms
ab['FOV'] = fov /12 # Angstroms
ab['convergence_angle'] = 30 # mrad
ab['wavelength'] = it.get_wavelength(ab['acceleration_voltage'])
cif_path = (
PROJECT_ROOT
/ "data"
/ "cif_files"
/ "WS2_ortho.cif"
)
print("Reading CIF from:", cif_path)
xtal = read(cif_path)
xtal = xtal * (30, 20, 1)
positions = xtal.get_positions()[:, :2]
pixel_size = 0.106 # angstrom/pixel
frame = (0,fov,0,fov) # limits of the image in angstroms
potential = dg.create_pseudo_potential(xtal, pixel_size, sigma=1, bounds=frame, atom_frame=11)
probe = dg.get_probe(ab, potential)
image = dg.convolve_kernel(potential, probe)
noisy_image = dg.lowfreq_noise(image, noise_level=0.5, freq_scale=.04)

scan_time = dwell_time * size * size
counts = scan_time * (beam_current * 1e-12) / (1.602e-19)
sim_im = dg.poisson_noise(noisy_image, counts=counts)
# convert args dict

# time.sleep(1)
image = np.array(image, dtype=np.float32)
sim_im = np.array(sim_im, dtype=np.float32)
print(sim_im.shape) # TODO: the simulation is independent of size and always returns - (905, 905) image -> need to check
return sim_im

def _acquire_stem_image_advanced(
self,
detector_names: list[str],
Expand Down Expand Up @@ -588,4 +697,4 @@ def get_viewport_metadata(self) -> str:
return json.dumps(metadata)

if __name__ == "__main__":
ThermoDigitalTwin.run_server()
ThermoDigitalTwin.run_server()
16 changes: 16 additions & 0 deletions asyncroscopy/hardware/CORRECTOR.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ def reconnect(self) -> None:
"""Re-attempt the TCP connection to the CEOS server."""
self._connect()


# ------------------------------------------------------------------
# Public commands pertaining to simulation
# ------------------------------------------------------------------

@command(dtype_in=str)
def set_aberrations_coeff_sim(self, json_aberrations_string: str):
self.ab = json.loads(json_aberrations_string)
pass

@command(dtype_out=str)
def get_aberrations_coeff_sim(self):
if self.ab == None:
return
return json.dumps(self.ab)

# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
Expand Down
Loading
Loading