-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprep.py
More file actions
333 lines (281 loc) · 15.8 KB
/
prep.py
File metadata and controls
333 lines (281 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#!python
import os
import mph
import utils
import numpy as np
from scipy.interpolate import griddata
import argparse
class COMSOLProcessor:
def __init__(self, filename, time_range=None):
self.filename = filename
print("MPH: start", flush=True)
self.client = mph.start()
print(f"MPH: loading {self.filename}", flush=True)
self.model = self.client.load(self.filename)
self.geom = self.model.java.geom('geom1')
self.spf = self.model.java.physics('spf')
self.current_units = utils.get_current_units(self)
self.time_range = time_range
def extract_boundary_conditions(self):
"""Extract boundary condition data such as inlets and outlets."""
data = { 'inlets': [], 'outlets': [], 'walls': [] }
edges = self.get_edges_vertices()
for tag in self.spf.feature().tags():
feature = self.spf.feature(tag)
bc_type = feature.getType()
selection = feature.selection()
entities = selection.entities()
for entity in entities:
entity_index = entity - 1
if entity_index < 0 or entity_index >= len(edges):
continue
coords = edges[entity_index]
if bc_type == 'InletBoundary':
data['inlets'].append({'tag': tag, 'coordinates': coords})
elif bc_type == 'OutletBoundary':
data['outlets'].append({'tag': tag, 'coordinates': coords})
elif bc_type == 'WallBC':
data['walls'].append({'tag': tag, 'coordinates': coords})
return data
def get_edges_vertices(self):
"""Retrieve all the edges' coordinates for the model."""
result = []
vertex_coords = self.geom.getVertexCoord()
start_indices, end_indices = self.geom.getStartEnd()
for i in range(len(start_indices)):
start_index = start_indices[i] - 1
end_index = end_indices[i] - 1
start_vertex_coords = (vertex_coords[0][start_index], vertex_coords[1][start_index])
end_vertex_coords = (vertex_coords[0][end_index], vertex_coords[1][end_index])
result.append((start_vertex_coords, end_vertex_coords))
return result
def _get_domain_boundaries(self):
"""Get the minimum and maximum coordinates of the domain"""
vertex_coords = self.geom.getVertexCoord()
x_coords = vertex_coords[0]
y_coords = vertex_coords[1]
return np.min(x_coords), np.max(x_coords), np.min(y_coords), np.max(y_coords)
def read_from_tables(self, pressure_table_tag='tbl1', velocity_table_tag='tbl2'):
try:
# Get the tables
try:
pressure_table = self.model.java.result().table(pressure_table_tag)
velocity_table = self.model.java.result().table(velocity_table_tag)
except Exception as e:
print(f"Error accessing tables: {str(e)}", flush=True)
print(f"Available tables: {[t.tag() for t in self.model.java.result().tables()]}", flush=True)
return None
# Get data from tables - False means don't include headers
try:
pressure_data = pressure_table.getTableData(False)
velocity_data = velocity_table.getTableData(False)
except Exception as e:
print(f"Error reading table data: {str(e)}", flush=True)
return None
# Convert string data to float arrays
try:
# Convert Java String objects to Python strings and then to floats
pressure_data = [[float(str(val)) for val in row] for row in pressure_data]
velocity_data = [[float(str(val)) for val in row] for row in velocity_data]
# Then convert to numpy arrays
pressure_data = np.array(pressure_data)
velocity_data = np.array(velocity_data)
print(f"Pressure table shape: {pressure_data.shape}")
print(f"Velocity table shape: {velocity_data.shape}")
except Exception as e:
print(f"Error converting table data to numeric format: {str(e)}", flush=True)
return None
# Validate data structure
if pressure_data.shape[1] < 3 or velocity_data.shape[1] < 4:
print(f"Invalid table structure. Pressure table shape: {pressure_data.shape}, Velocity table shape: {velocity_data.shape}", flush=True)
return None
# Extract coordinates and values
x = pressure_data[:, 0] # x coordinates
y = pressure_data[:, 1] # y coordinates
p = pressure_data[:, 2] # pressure values
u = velocity_data[:, 2] # u velocity
v = velocity_data[:, 3] # v velocity
# Validate data consistency
if not np.allclose(x, velocity_data[:, 0]) or not np.allclose(y, velocity_data[:, 1]):
print("Warning: Coordinate mismatch between pressure and velocity tables", flush=True)
return None
# Calculate velocity magnitude
velocity_magnitude = np.sqrt(u**2 + v**2)
# Get unique x and y coordinates
x_unique = np.unique(x)
y_unique = np.unique(y)
# Reshape data into 2D arrays
nx = len(x_unique)
ny = len(y_unique)
# Create meshgrid for reshaping
X, Y = np.meshgrid(x_unique, y_unique)
# Reshape the data
u = u.reshape(ny, nx)
v = v.reshape(ny, nx)
p = p.reshape(ny, nx)
velocity_magnitude = velocity_magnitude.reshape(ny, nx)
# Get domain boundaries
x_min, x_max = np.min(x_unique), np.max(x_unique)
y_min, y_max = np.min(y_unique), np.max(y_unique)
domain_info = {
'x_min': float(x_min), 'x_max': float(x_max),
'y_min': float(y_min), 'y_max': float(y_max),
'nx': nx, 'ny': ny,
'units': 'm', # Tables store data in SI units
'cells_density': nx / (x_max - x_min) # Calculate cells density
}
return {
'x': x_unique, 'y': y_unique,
'u': u, 'v': v, 'pressure': p, 'velocity_magnitude': velocity_magnitude,
'grid_size': (nx, ny),
'u_flat': u.flatten(), 'v_flat': v.flatten(), 'p_flat': p.flatten(), 'velocity_magnitude_flat': velocity_magnitude.flatten(),
'domain_info': domain_info
}
except Exception as e:
print(f"Error reading from tables: {str(e)}", flush=True)
return None
def extract_solution_data(self, solve, cells_density, use_tables=False, pressure_table_tag='tbl1', velocity_table_tag='tbl2'):
"""Extract solution data using the direct evaluate method from COMSOL or from existing tables.
Args:
solve (bool): Whether to solve the model
cells_density (float): Number of cells per unit length
use_tables (bool): Whether to read data from existing tables instead of evaluating
pressure_table_tag (str): Tag of the pressure table
velocity_table_tag (str): Tag of the velocity table
"""
if use_tables:
solution_data = self.read_from_tables(pressure_table_tag, velocity_table_tag)
if solution_data is not None:
return solution_data
print("Failed to read from tables, falling back to direct evaluation", flush=True)
if solve:
if self.time_range:
try:
study = self.model.java.study('std1')
step = study.feature('time')
step.set('tlist', self.time_range)
print(f"Set time range for study: {self.time_range}", flush=True)
except Exception as e:
print(f"Warning: Could not set time range: {e}", flush=True)
print('Solving... ', flush=True)
self.model.solve()
print('Solved', flush=True)
# Always fetch the last iteration even if we don't solve
iteration_to_fetch = 'last'
u, v = self.model.evaluate(['u', 'v'], unit='m/s', inner=iteration_to_fetch)
p = self.model.evaluate('p', unit='mPa', inner=iteration_to_fetch)
velocity = np.stack((u, v), axis=-1)
velocity_magnitude = np.linalg.norm(velocity, axis=-1)
if self.current_units == 'm':
u = utils.convert_to_mm(self, u, 'velocity')
v = utils.convert_to_mm(self, v, 'velocity')
velocity_magnitude = utils.convert_to_mm(self, velocity_magnitude, 'velocity')
points = self.model.evaluate(['x', 'y'], unit='m', inner=iteration_to_fetch)
x, y = points[0], points[1]
if self.current_units == 'm':
x = utils.convert_to_mm(self, x, 'length')
y = utils.convert_to_mm(self, y, 'length')
x_min, x_max, y_min, y_max = self._get_domain_boundaries()
x_min = utils.convert_to_mm(self, x_min, 'length')
x_max = utils.convert_to_mm(self, x_max, 'length')
y_min = utils.convert_to_mm(self, y_min, 'length')
y_max = utils.convert_to_mm(self, y_max, 'length')
nx = int((x_max - x_min) * cells_density)
ny = int((y_max - y_min) * cells_density)
x_sorted = np.linspace(x_min, x_max, nx)
y_sorted = np.linspace(y_min, y_max, ny)
grid_x, grid_y = np.meshgrid(x_sorted, y_sorted)
points = np.column_stack((x, y))
u = griddata(points, u, (grid_x, grid_y), method='nearest')
v = griddata(points, v, (grid_x, grid_y), method='nearest')
p = griddata(points, p, (grid_x, grid_y), method='nearest')
velocity_magnitude = griddata(points, velocity_magnitude, (grid_x, grid_y), method='nearest')
u_flat = u.flatten()
v_flat = v.flatten()
p_flat = p.flatten()
velocity_magnitude_flat = velocity_magnitude.flatten()
domain_info = { 'x_min': float(x_min), 'x_max': float(x_max), 'y_min': float(y_min), 'y_max': float(y_max), 'nx': nx, 'ny': ny, 'units': 'mm', 'cells_density': cells_density }
return {
'x': x_sorted, 'y': y_sorted,
'u': u, 'v': v, 'pressure': p, 'velocity_magnitude': velocity_magnitude,
'grid_size': (nx, ny),
'u_flat': u_flat, 'v_flat': v_flat, 'p_flat': p_flat, 'velocity_magnitude_flat': velocity_magnitude_flat,
'domain_info': domain_info
}
def process(self, output_dir, solve=False, cells_density=1, use_tables=False, pressure_table_tag='tbl1', velocity_table_tag='tbl2'):
"""Export velocity and pressure data in a format suitable for cellular automata
Args:
output_dir (str): Directory to save the exported data
solve (bool): Whether to solve the model
cells_density (float): Number of cells per unit length
use_tables (bool): Whether to read data from existing tables instead of evaluating
pressure_table_tag (str): Tag of the pressure table
velocity_table_tag (str): Tag of the velocity table
"""
os.makedirs(output_dir, exist_ok=True)
solution_data = self.extract_solution_data(solve, cells_density, use_tables=use_tables,
pressure_table_tag=pressure_table_tag,
velocity_table_tag=velocity_table_tag)
if solution_data is None:
print("Failed to extract solution data. Cannot proceed with export.", flush=True)
return None
boundary_conditions = self.extract_boundary_conditions()
# Convert boundary conditions to mm
for bc_type in boundary_conditions:
for bc in boundary_conditions[bc_type]:
start_coords = bc['coordinates'][0]
end_coords = bc['coordinates'][1]
bc['coordinates'] = (
(utils.convert_to_mm(self, start_coords[0], 'length'),
utils.convert_to_mm(self, start_coords[1], 'length')),
(utils.convert_to_mm(self, end_coords[0], 'length'),
utils.convert_to_mm(self, end_coords[1], 'length'))
)
# Ensure all pressure values are non-negative by finding min value and shifting
min_pressure = np.min(solution_data['p_flat'])
if min_pressure < 0:
pressure_offset = abs(min_pressure)
solution_data['pressure'] = solution_data['pressure'] + pressure_offset
solution_data['p_flat'] = solution_data['p_flat'] + pressure_offset
print(f"Shifted pressure values by +{pressure_offset} to ensure non-negative values", flush=True)
else:
pressure_offset = 0
print("No pressure shift needed, all values already non-negative", flush=True)
# Save velocity and pressure data
np.savetxt(os.path.join(output_dir, 'u.txt'), solution_data['u_flat'], fmt='%.12e')
np.savetxt(os.path.join(output_dir, 'v.txt'), solution_data['v_flat'], fmt='%.12e')
np.savetxt(os.path.join(output_dir, 'pressure.txt'), solution_data['p_flat'], fmt='%.12e')
np.savetxt(os.path.join(output_dir, 'velocity_magnitude.txt'), solution_data['velocity_magnitude_flat'], fmt='%.12e')
# Save pressure offset information
with open(os.path.join(output_dir, 'pressure_offset.txt'), 'w') as f:
f.write(f"{pressure_offset}\n")
# Save domain info
with open(os.path.join(output_dir, 'domain_info.txt'), 'w') as f:
for key, value in solution_data['domain_info'].items():
f.write(f"{key} {value}\n")
# Save boundary conditions
with open(os.path.join(output_dir, 'boundary_conditions.txt'), 'w') as f:
for bc_type, bc_list in boundary_conditions.items():
f.write(f"{bc_type}\n")
for bc in bc_list:
tag = str(bc['tag']) if hasattr(bc['tag'], 'toString') else bc['tag']
f.write(f"tag {tag}\n")
start_coords = bc['coordinates'][0]
end_coords = bc['coordinates'][1]
f.write(f"coords {start_coords[0]} {start_coords[1]} {end_coords[0]} {end_coords[1]}\n")
f.write("\n")
print(f"Data exported to {output_dir}", flush=True)
return { 'domain_info': solution_data['domain_info'], 'solution_data': solution_data, 'boundary_conditions': boundary_conditions }
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--solve', action='store_true', help='Solve the model before processing')
parser.add_argument('--input', required=True, help='Input COMSOL model file (.mph)')
parser.add_argument('--output', default='data', help='Output directory for exported data (default: data)')
parser.add_argument('--time', type=str, default=None, help='Time range for time-dependent solve (e.g. "range(0,0.1,1.0)" or "0 0.1 0.2 ... 1.0")')
parser.add_argument('--cells-density', type=float, default=1.0, help='Number of cells per mm (default: 1.0)')
parser.add_argument('--use-tables', action='store_true', help='Read data from existing tables instead of evaluating')
parser.add_argument('--pressure-table', type=str, default='tbl1', help='Tag of the pressure table (default: tbl1)')
parser.add_argument('--velocity-table', type=str, default='tbl2', help='Tag of the velocity table (default: tbl2)')
args = parser.parse_args()
processor = COMSOLProcessor(args.input, time_range=args.time)
ca_data = processor.process(args.output, solve=args.solve, cells_density=args.cells_density, use_tables=args.use_tables, pressure_table_tag=args.pressure_table, velocity_table_tag=args.velocity_table)