-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenv.py
More file actions
114 lines (87 loc) · 3.38 KB
/
env.py
File metadata and controls
114 lines (87 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import time
import gymnasium as gym
import numpy as np
import serial
from gymnasium import spaces
class RealPendulumEnv(gym.Env):
"""
Gymnasium environment for a real pendulum controlled by ESP32 + AS5600.
Fixed at 10 Hz control loop.
Communication protocol:
- Send: motor command as integer string + '\n' (range -255..255)
- Receive: angle in degrees as float string
"""
metadata = {"render.modes": []}
def __init__(self, port="/dev/ttyUSB0", baudrate=19200, max_episode_steps=200):
super().__init__()
# --- Serial connection ---
self.ser = serial.Serial(port, baudrate, timeout=1)
time.sleep(2.0) # give ESP32 time to reset
# --- Spaces ---
self.action_space = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
self.observation_space = spaces.Box(
low=np.array([0.0], dtype=np.float32),
high=np.array([np.pi], dtype=np.float32),
dtype=np.float32,
)
# --- State ---
self.max_episode_steps = max_episode_steps
self.step_count = 0
self.last_obs_deg = np.array([0.0], dtype=np.float32)
self.last_action = 0.0
self.last_step_time = None
self.period = 0.1 # 10 Hz
def _send_action(self, action: float):
"""Send action to ESP32 and get angle back."""
pwm = int(np.clip(action, -1, 1) * 255)
cmd = f"{pwm}\n".encode("utf-8")
self.ser.write(cmd)
line = self.ser.readline().decode("utf-8").strip()
try:
angle_deg = float(line)
except ValueError:
angle_deg = float(self.last_obs_deg[0]) # fallback to last obs
angle_deg = np.clip(angle_deg, 0.0, 180.0)
return np.array([angle_deg], dtype=np.float32)
def step(self, action):
self.step_count += 1
# --- Send command immediately ---
self.last_obs_deg = self._send_action(action[0])
# --- Enforce 10 Hz timing ---
now = time.time()
if self.last_step_time is None:
self.last_step_time = now
elapsed = now - self.last_step_time
remaining = self.period - elapsed
if remaining > 0:
time.sleep(remaining)
else:
print(f"[WARN] Step overran by {-remaining:.3f}s")
self.last_step_time = time.time()
angle_deg = float(self.last_obs_deg[0])
angle_rad = np.deg2rad(angle_deg)
# --- Reward ---
angle_rew = np.sin(angle_rad) ** 2
action_cost = (action[0] - 0.5) ** 2
continuity_cost = (self.last_action - action[0]) ** 2
reward = angle_rew - 0.5 * action_cost - 0.0 * continuity_cost
terminated = False
truncated = self.step_count >= self.max_episode_steps
if truncated:
self._send_action(0.0)
self.last_action = action[0]
return np.deg2rad(self.last_obs_deg), float(reward), terminated, truncated, {}
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.step_count = 0
# Reset motor
self._send_action(0.0)
time.sleep(1.0)
# First obs
self.last_obs_deg = self._send_action(0.0)
self.last_step_time = time.time()
return np.deg2rad(self.last_obs_deg), {}
def close(self):
if self.ser.is_open:
self._send_action(0.0)
self.ser.close()