-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathnum_solver.py
More file actions
149 lines (130 loc) · 6.53 KB
/
num_solver.py
File metadata and controls
149 lines (130 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# EQs in SF form: dω/dt + dψ/dy * d/dx (ω) - dψ/dx * d/dy(ψ) = nu * ω^2
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
class Solver:
def __init__(self, N: int | float, Lxy: float, dt: float, nu: float, T: float, psi0: np.ndarray, omega0: np.ndarray | bool, forcing_amp: float = 0.0, forcing_k: int = 1):
self.N = N
self.Lxy = Lxy
self.dt = dt
self.nu = nu #kinematic viscosity
self.T = T
self.num_steps = int(T / dt) # Number of time steps
# Initialize grid and wavenumbers
self.xx = np.linspace(0, Lxy, N, False)
self.yy = np.linspace(0, Lxy, N, False)
self.XX, self.YY = np.meshgrid(self.xx, self.yy, indexing='ij')
# Wavenumbers
kk = np.fft.fftfreq(N, Lxy/N) * 2 * np.pi
self.kx = kk.reshape((N, 1))
self.ky = kk.reshape((1, N))
self.k2 = self.kx**2 + self.ky**2
self.k2[0, 0] = 1 # avoid division by zero
#initial conditions
self.psi = psi0
self.psi_hat = np.fft.fft2(self.psi)
self.omega = omega0 if omega0 is not False else np.fft.ifft2(self.k2 * self.psi_hat).real
self.omega_hat = np.fft.fft2(self.omega)
# forcing (Kolmogorov) parameters
# forcing_amp: amplitude of body force in x-direction, f_x = F0 * sin(k*y_phys)
# forcing_k: integer number of periods across domain
self.forcing_amp = forcing_amp
self.forcing_k = forcing_k
self.force = forcing_amp * np.sin(forcing_k * 2 * np.pi / Lxy * self.YY)
# psi = np.sin(XX)*np.sin(YY) #TG vortex IC
# psi_hat = np.fft.fft2(psi)
# omega = 2 * np.sin(XX) * np.sin(YY)
# omega_hat = np.fft.fft2(omega)
def dealias(self, f_hat):
N = f_hat.shape[0]
k_cutoff = N // 3
k_int = np.fft.fftfreq(N) * N
mask_1d = np.abs(k_int) <= k_cutoff
mask_2d = np.outer(mask_1d, mask_1d)
dealiased = f_hat * mask_2d
return dealiased
#J = dpsi/dy * domega/dx - dpsi/dx * domega/dy
def det_jacobian(self, psi_hat):
scale = 1 / (self.Lxy * self.Lxy) # Rescale to match the original domain size
dpsi_dy = np.fft.ifft2(1j * self.ky * psi_hat) * scale
domega_dx = np.fft.ifft2(1j * self.kx * self.k2 * psi_hat) * scale
dpsi_dx = np.fft.ifft2(1j * self.kx * psi_hat) * scale
domega_dy = np.fft.ifft2(1j * self.ky * self.k2 * psi_hat) * scale
# print(f"dpsi/dy: {dpsi_dy}\ndomega/dx: {domega_dx}\ndpsi/dx: {dpsi_dx}\ndomega/dy: {domega_dy}\n")
return dpsi_dy * domega_dx - dpsi_dx * domega_dy
#ETD-RK4 method
def nonlinear(self, psi_hat):
J = self.det_jacobian(self.dealias(psi_hat))
# J_hat = self.dealias(np.fft.fft2(J))
J_hat = np.fft.fft2(J)
forcing_hat = 1j * self.ky * np.fft.fft2(self.force)
return J_hat - forcing_hat
def phi1(self, z):
phi = np.empty_like(z, dtype=np.complex128)
small = np.abs(z) < 1e-6
phi[small] = 1 + z[small] / 2 + z[small]**2 / 6 + z[small]**3 / 24 + z[small]**4 / 120
phi[~small] = (np.exp(z[~small]) - 1)/z[~small]
return phi
def run(self, debug=False) -> np.ndarray | int:
L = -self.nu * self.k2
E = np.exp(self.dt * L)
E2 = np.exp(self.dt * L/2)
phi_E = self.phi1(L * self.dt)
phi_E2 = self.phi1(L * self.dt / 2)
psis = np.zeros((self.num_steps, self.N, self.N), dtype=np.float64)
qs = np.zeros((self.num_steps, self.N, self.N), dtype=np.float64)
# precompute forcing in Fourier space (vorticity forcing)
for step in tqdm(range(self.num_steps)):
if debug:
if not (np.all(np.isfinite(self.omega_hat)) and np.all(np.isfinite(self.psi_hat))):
return step
elif np.any(np.abs(self.omega_hat) > 1e5) or np.any(np.abs(self.psi_hat) > 1e5):
return step
a = E2 * self.omega_hat + self.dt * phi_E2 * self.nonlinear(self.psi_hat)
Na = self.nonlinear(a)
b = E2 * self.omega_hat + self.dt * phi_E2 * Na
Nb = self.nonlinear(b)
c = E * self.omega_hat + self.dt * phi_E * (2* Nb - self.nonlinear(self.psi_hat))
Nc = self.nonlinear(c)
self.omega_hat = E * self.omega_hat + self.dt * (phi_E * self.nonlinear(self.psi_hat) + 2*phi_E*(Na + Nb) + phi_E * Nc)/6
self.omega = np.fft.ifft2(self.omega_hat).real #/ (self.Lxy * self.Lxy) # Rescale to match the original domain size
self.psi_hat = self.omega_hat / self.k2 #i^2 = -1; -1 / - 1 = 1.
self.psi_hat[0, 0] = 0 # Enforce zero mean for
self.psi = np.fft.ifft2(self.psi_hat).real #/ (self.Lxy * self.Lxy)
if step < self.num_steps:
psis[step ] = self.psi
#velocity field
qs[step] = np.sqrt(np.fft.ifft2(1j * self.ky * self.psi_hat).real **2 + np.fft.ifft2(1j * self.kx * self.psi_hat).real **2) # Magnitude of velocity field
return psis, qs
# Animation using matplotlib
def animate_snapshots(self, psis, snapshot_steps):
if len(psis) == 0:
print("No psi snapshots were saved. Animation will not run.")
return
# Select evenly spaced indices
indices = np.linspace(0, len(psis), snapshot_steps, False, dtype=int)
psi_snapshots = psis[indices]
# For title, get the actual time step for each snapshot
step_numbers = indices
# Check shapes
shapes = [np.shape(s) for s in psi_snapshots]
if not all(s == shapes[0] and len(s) == 2 for s in shapes):
raise ValueError(f"Not all psi_snapshots have the same 2D shape: {shapes}")
# Animate
fig, ax = plt.subplots()
im = ax.imshow(psi_snapshots[0], origin='lower', cmap='viridis', extent=(0, self.Lxy, 0, self.Lxy))
ax.set_title(f'Streamfunction ψ, step {step_numbers[0]}')
plt.colorbar(im, ax=ax)
def update(frame):
im.set_data(psi_snapshots[frame])
ax.set_title(f'Streamfunction ψ, step {step_numbers[frame]}')
return (im,)
ani = animation.FuncAnimation(fig, update, frames=len(psi_snapshots), interval=100, blit=False)
plt.show()
X, Y = np.meshgrid(np.linspace(0, 2 * np.pi, 128, False), np.linspace(0, 2 * np.pi, 128, False))
psi = np.sin(X) * np.sin(Y) # TG vortex initial condition
omega = 2 * psi
TG_vortex = Solver(128, 2 * np.pi, 0.001, 10, 1, psi, omega)
# psis = TG_vortex.run()
# TG_vortex.animate_snapshots(psis, 10)