-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmulti_marker_optical_flow.py
More file actions
3044 lines (2274 loc) · 110 KB
/
multi_marker_optical_flow.py
File metadata and controls
3044 lines (2274 loc) · 110 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import sys
import glob
import matplotlib.pyplot as plt
from multi_marker_source_function import *
from sklearn.linear_model import LinearRegression
import scipy.signal as signal
from sklearn.cluster import KMeans
import scipy.stats as stats
DISCARD_HIGH_SPEED = 0
MAX_SPEED=0.70
SHOW_STAIRCASE_ROBOT = 0
SHOW_FILTRATION_SIGNAL_OF = 0
fig_size = (9,6)
adjusto = 0.15
label_size_axes = 20
labelxy_size = 20
title_font_size = 20
y_limiter = 1.3
y_lim_starter = -0.1
sns_dot_size = 80
x_lim_defined = (0,3800)
# Directory where graphs are saved
save_directory = "graphs"
# Ensure the directory exists
os.makedirs(save_directory, exist_ok=True)
#Modifiche: RMS dei residui non media
#poi faccio RADQ(rms^2 piu dev std ^2)
def combined_analysis_all_velocities(modelled_df, vpx_bin_width=30):
"""
Combina l'analisi dei residui medi e della deviazione standard di z per tutte le velocità,
calcolando il valore RMS (radice della somma dei quadrati) per ciascun intervallo di v_px.
Genera un unico grafico che mostra i segnali combinati di tutte le velocità.
Parametri:
- modelled_df: dizionario con DataFrame per ogni velocità, contenente i valori di v_px, z_sim, K e sigma_0.
- vpx_bin_width: ampiezza dell'intervallo per raggruppare i valori di v_px.
Ritorna:
- Nessun valore di ritorno, ma plotta un grafico unico con i segnali combinati per tutte le velocità.
"""
sns.set_theme(style="whitegrid")
plt.figure(figsize=fig_size)
plt.grid(True)
for velocity_key, df in modelled_df.items():
# Numero di punti simulati (viene calcolato dinamicamente)
num_points = sum('vx_' in col for col in df.columns)
# Unisci tutti i valori di v_px e z_sim per tutte le simulazioni in un'unica lista
all_v_px = np.concatenate([df[f'vx_{i}'].values for i in range(1, num_points + 1)])
all_z_sim = np.concatenate([df[f'z_{i}'].values for i in range(1, num_points + 1)])
# Ripeti i valori di K per il numero di punti per ottenere una lista della stessa lunghezza di all_v_px
K_values = np.repeat(df['K'].values, num_points)
# Calcola i residui assoluti per ciascun valore di z e v_px (F1)
residuals = np.abs(all_z_sim - (K_values / all_v_px))
# Definisci i bin per i valori di v_px
vpx_min, vpx_max = all_v_px.min(), all_v_px.max()
bins = np.arange(vpx_min, vpx_max + vpx_bin_width, vpx_bin_width)
# Liste per memorizzare il residuo medio, deviazione standard, e il valore medio di v_px per ogni intervallo
mean_residuals = []
std_devs = []
mean_v_px = []
# Calcolo di residuo medio e deviazione standard per ciascun intervallo di v_px
for i in range(len(bins) - 1):
# Maschera per selezionare i valori che rientrano nell'intervallo
bin_mask = (all_v_px >= bins[i]) & (all_v_px < bins[i + 1])
if np.any(bin_mask): # Se ci sono valori in questo intervallo
# Calcola la media dei residui per i punti nell'intervallo (F1)
mean_residuals.append(np.sqrt(np.mean(residuals[bin_mask]**2)))
# Calcola la deviazione standard dei valori di z_model nell'intervallo (F2)
z_model = K_values[bin_mask] / all_v_px[bin_mask]
std_devs.append(np.std(z_model))
# Calcola il valore medio di v_px nell'intervallo
mean_v_px.append((bins[i] + bins[i + 1]) / 2)
# Calcolo RMS di F1 e F2
combined_values = [np.sqrt(mean_residuals[i]**2 + std_devs[i]**2) for i in range(len(mean_residuals))]
# Aggiungi la curva al grafico (dentro il ciclo)
sns.scatterplot(x=mean_v_px, y=combined_values, marker='o', s=sns_dot_size, alpha=0.8, edgecolor="black",
linewidth=0.6,
label=f'$V_{{ext}}$ = {velocity_key.split("_")[-1]}$ \\left(\\frac{{m}}{{s}}\\right)$')
# Configurazione del grafico
plt.title(r'$\sigma_z$, $\epsilon$ and RMS($\sigma_z$ + $\epsilon$), all $V_{ext}$', fontsize=20)
plt.xlabel(r'$v_{px}$ $\left[\frac{px}{s}\right]$', fontsize=labelxy_size)
plt.ylabel('[m]', fontsize=labelxy_size)
plt.xlim(x_lim_defined)
# Formattazione degli assi
plt.tick_params(axis='both', which='major', labelsize=label_size_axes)
# Legenda
plt.legend(fontsize=15)
# Aggiusta gli spazi
plt.subplots_adjust(bottom=adjusto)
# Mantiene il metodo di salvataggio originale
existing_files = [f for f in os.listdir(save_directory) if os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
# Mostra il grafico
plt.show()
def combined_analysis_sigma_vs_vx(modelled_df, vpx_bin_width=30):
"""
Combina l'analisi dei residui medi (rispetto al modello z = K / v_px) e della deviazione standard di z.
La funzione calcola e plotta la somma dei residui medi e della deviazione standard per ciascun intervallo di v_px.
Inoltre, include grafici separati per i soli residui e per la sola deviazione standard nello stesso grafico.
Parametri:
- modelled_df: dizionario con DataFrame per ogni velocità, contenente i valori di v_px, z_sim, K e sigma_0.
- vpx_bin_width: ampiezza dell'intervallo per raggruppare i valori di v_px.
Ritorna:
- Nessun valore di ritorno, ma plotta un grafico dettagliato per ogni velocità.
"""
for velocity_key, df in modelled_df.items():
# Numero di punti simulati (viene calcolato dinamicamente)
num_points = sum('vx_' in col for col in df.columns)
# Unisci tutti i valori di v_px e z_sim per tutte le simulazioni in un'unica lista
all_v_px = np.concatenate([df[f'vx_{i}'].values for i in range(1, num_points + 1)])
all_z_sim = np.concatenate([df[f'z_{i}'].values for i in range(1, num_points + 1)])
# Ripeti i valori di K per il numero di punti per ottenere una lista della stessa lunghezza di all_v_px
K_values = np.repeat(df['K'].values, num_points)
# Calcola i residui assoluti per ciascun valore di z e v_px (F1)
residuals = np.abs(all_z_sim - (K_values / all_v_px))
# Definisci i bin per i valori di v_px
vpx_min, vpx_max = all_v_px.min(), all_v_px.max()
bins = np.arange(vpx_min, vpx_max + vpx_bin_width, vpx_bin_width)
# Liste per memorizzare il residuo medio, deviazione standard, e il valore medio di v_px per ogni intervallo
mean_residuals = []
std_devs = []
mean_v_px = []
# Calcolo di residuo medio e deviazione standard per ciascun intervallo di v_px
for i in range(len(bins) - 1):
# Maschera per selezionare i valori che rientrano nell'intervallo
bin_mask = (all_v_px >= bins[i]) & (all_v_px < bins[i + 1])
if np.any(bin_mask): # Se ci sono valori in questo intervallo
# Calcola la media dei residui per i punti nell'intervallo (F1)
mean_residuals.append(np.sqrt(np.mean(residuals[bin_mask]**2)))
# Calcola la deviazione standard dei valori di z_model nell'intervallo (F2)
z_model = K_values[bin_mask] / all_v_px[bin_mask]
std_devs.append(np.std(z_model))
# Calcola il valore medio di v_px nell'intervallo
mean_v_px.append((bins[i] + bins[i + 1]) / 2)
# Somma di F1 e F2
combined_values = [np.sqrt(mean_residuals[i]**2 + std_devs[i]**2) for i in range(len(mean_residuals))]
# Imposta lo stile Seaborn
sns.set_theme(style="whitegrid") # Usa "whitegrid" per mantenere la griglia visibile
# Crea la figura
plt.figure(figsize=fig_size)
# Disegna le linee mantenendo lo stile originale
sns.lineplot(x=mean_v_px, y=combined_values, color='g', label=r'Total Uncertainty')
sns.lineplot(x=mean_v_px, y=mean_residuals, color='b', linestyle='--', label=r'RMS Residuals $\epsilon$')
sns.lineplot(x=mean_v_px, y=std_devs, color='r', linestyle=':', label=r'$\sigma_z$')
# Aggiungi scatterplot per rendere i punti più visibili
sns.scatterplot(x=mean_v_px, y=combined_values, marker='o', color='g', s=sns_dot_size, alpha=0.5, edgecolor="black")
sns.scatterplot(x=mean_v_px, y=mean_residuals, marker='x', color='b', s=sns_dot_size, alpha=1, edgecolor="black")
sns.scatterplot(x=mean_v_px, y=std_devs, marker='s', color='r', s=sns_dot_size, alpha=1, edgecolor="black")
# Titolo e assi
clean_velocity_key = velocity_key.split("_")[-1].strip().replace('\x0c', '').replace('\r', '')
plt.title(f'RMS sum contrib, Vext = {clean_velocity_key} (m/s)', fontsize=title_font_size)
plt.xlabel(r'$v_{px}$ [$\frac{px}{s}$]', fontsize=labelxy_size)
plt.ylabel('[m]', fontsize=labelxy_size)
plt.xlim(x_lim_defined)
plt.ylim(y_lim_starter, y_limiter)
# Modifica la grandezza dei numeri sugli assi
plt.tick_params(axis='both', which='major', labelsize=label_size_axes)
# Aggiusta lo spazio inferiore se necessario
plt.subplots_adjust(bottom=adjusto)
# Legenda con lo stesso font size
plt.legend(fontsize=15)
# Mantiene il metodo di salvataggio
existing_files = [f for f in os.listdir(save_directory) if os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
# Mostra il grafico
plt.show()
@log_function_call
def analyze_std_dev_of_zModel_by_vx(modelled_df, vpx_bin_width=30):
"""
Calcola la deviazione standard di z previsto dal modello z = K / v_px per ogni intervallo di velocità v_px.
Combina i risultati di tutte le velocità in un singolo grafico.
Parametri:
- modelled_df: dizionario con DataFrame per ogni velocità, contenente i valori di v_px, z_sim, K e sigma_0.
- vpx_bin_width: ampiezza dell'intervallo per raggruppare i valori di v_px.
Ritorna:
- Nessun valore di ritorno, ma plotta un grafico unico della deviazione standard.
"""
plt.figure(figsize=fig_size)
plt.grid(True)
sns.set_theme(style="whitegrid")
for velocity_key, df in modelled_df.items():
# Numero di punti simulati
num_points = sum('vx_' in col for col in df.columns)
# Estrazione di tutti i valori di v_px e K come array
v_px_values = np.hstack([df[f'vx_{i}'].values.reshape(-1, 1) for i in range(1, num_points + 1)])
K_values = df['K'].values.reshape(-1, 1) # Reshape per broadcast su colonne
# Calcolo di z_model per ogni colonna di v_px e per ogni simulazione
z_model = K_values / v_px_values
# Flatten dei valori di v_px e z_model
all_v_px_flat = v_px_values.flatten()
all_z_model_flat = z_model.flatten()
# Definisci i bin per i valori di v_px
vpx_min, vpx_max = all_v_px_flat.min(), all_v_px_flat.max()
bins = np.arange(vpx_min, vpx_max + vpx_bin_width, vpx_bin_width)
# Liste per memorizzare la deviazione standard di z e il valore medio di v_px per ogni intervallo
std_devs = []
mean_v_px = []
# Calcolo della deviazione standard per ciascun intervallo di v_px
for i in range(len(bins) - 1):
# Maschera per selezionare i valori che rientrano nell'intervallo
bin_mask = (all_v_px_flat >= bins[i]) & (all_v_px_flat < bins[i + 1])
if np.any(bin_mask): # Se ci sono valori in questo intervallo
# Calcola la deviazione standard dei valori di z_model nell'intervallo
std_devs.append(np.std(all_z_model_flat[bin_mask]))
# Calcola il valore medio di v_px nell'intervallo
mean_v_px.append((bins[i] + bins[i + 1]) / 2)
# Aggiungi al grafico (dentro il ciclo)
sns.scatterplot(x=mean_v_px, y=std_devs, marker='o', s=sns_dot_size, alpha=0.8, edgecolor="black",
label=f'$V_{{ext}}$ = {velocity_key.split("_")[-1]}$ \\left(\\frac{{m}}{{s}}\\right)$')
# Configurazione del grafico
plt.title(r'STD of $z_{model}$: $\sigma_z$', fontsize=title_font_size)
plt.xlabel(r'$v_{px}$ $\left[\frac{px}{s}\right]$', fontsize=labelxy_size)
plt.ylabel(r'$\sigma_z$ [m]', fontsize=labelxy_size)
plt.tick_params(axis='both', which='major', labelsize=label_size_axes)
plt.ylim(-0.01,0.1)
plt.xlim(x_lim_defined)
# Mostra la legenda
plt.legend(fontsize=15)
# Aggiusta gli spazi
plt.subplots_adjust(bottom=adjusto)
# Mantiene il metodo di salvataggio originale
existing_files = [f for f in os.listdir(save_directory) if os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
# Mostra il grafico
plt.show()
def calculate_video_dt_std(folder_path):
"""
Calcola la media e la deviazione standard dei tempi di frame (delta_t) per ciascun video in una directory.
Parametri:
- folder_path: percorso della directory contenente i file video (.mp4).
Ritorna:
- Un dizionario con la media e la deviazione standard di delta_t per ogni video.
"""
results = {}
# Itera su tutti i file .mp4 nella directory specificata
for file_name in os.listdir(folder_path):
if file_name.endswith('.MP4'):
video_path = os.path.join(folder_path, file_name)
print(f"Analizzando il video: {video_path}")
# Apri il video
cap = cv2.VideoCapture(video_path)
# Lista per memorizzare i timestamp dei frame
timestamps = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Estrai il timestamp del frame corrente in millisecondi
timestamp = cap.get(cv2.CAP_PROP_POS_MSEC)
timestamps.append(timestamp)
# Chiude il video
cap.release()
# Calcola delta_t tra i frame consecutivi (in ms)
delta_t = np.diff(timestamps)
# Filtra i valori nulli e quelli fuori dall'intervallo [16.63 ms, 16.72 ms]
valid_delta_t = delta_t[(delta_t != 0) & (delta_t >= 16.63) & (delta_t <= 16.72)]
# Conta e stampa il numero di valori esclusi (nulli o fuori dall'intervallo)
excluded_count = len(delta_t) - len(valid_delta_t)
print(
f"Video: {file_name}, Numero di valori esclusi in delta_t (nulli o fuori intervallo): {excluded_count}")
# Calcola deviazione standard di delta_t (in ms)
std_delta_t = np.std(valid_delta_t)
# Propaga l'incertezza su 1 secondo (60 frame)
std_delta_t_1s = np.sqrt(60) * std_delta_t
# Salva i risultati nel dizionario
results[file_name] = {'std_delta_t_1s_ms': std_delta_t_1s, 'excluded_count': excluded_count}
# Stampa i risultati per il controllo
print(f"Video: {file_name}, Incertezza sul tempo per 1 secondo: {std_delta_t_1s:.12f} ms")
# Plot dei valori di delta_t con scatter
plt.figure(figsize=fig_size)
plt.grid(True)
indices = np.arange(len(delta_t))
plt.scatter(indices, delta_t, color='blue', label='delta_t (tutti in ms)')
plt.scatter(indices[(delta_t == 0) | (delta_t < 16.63) | (delta_t > 16.72)],
delta_t[(delta_t == 0) | (delta_t < 16.63) | (delta_t > 16.72)],
color='red', label='Valori esclusi')
plt.xlabel("Indice del Frame")
plt.ylabel("Delta t (ms)")
plt.title(f"Distribuzione dei delta_t per il video: {file_name}")
plt.legend()
plt.subplots_adjust(bottom=adjusto)
existing_files = [f for f in os.listdir(save_directory) if os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
# Generate a file name based on the next file number
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
plt.show()
return results
def plot_all_simulated_values(modelled_df):
"""
Per ogni DataFrame nelle simulazioni delle velocità 3D, stima K usando tutti i punti simulati
di v_px e z, e poi plotti i punti simulati e la curva stimata usando il modello z = K / v_px.
Inoltre, calcola il valore di sigma_0 (deviazione standard dei residui).
Parametri:
- modelled_df: dizionario con DataFrame per ogni velocità, contenente i valori di v_px, z_sim,
K e sigma_0.
Ritorna:
- Nessun valore di ritorno, ma plotti i punti simulati e la curva stimata per ogni velocità.
"""
for velocity_key, df in modelled_df.items():
# Ricava il numero di punti dinamicamente basandosi sulle colonne vx_sim
num_points = sum('vx_' in col for col in df.columns)
# Colleziona tutti i valori di v_px e z per stimare K
all_v_px = []
all_z_sim = []
for i in range(1, num_points + 1):
all_v_px.extend(df[f'vx_{i}'].values) # Aggiungi i valori v_px_i
all_z_sim.extend(df[f'z_{i}'].values) # Aggiungi i valori z_sim_i
all_v_px = np.array(all_v_px)
all_z_sim = np.array(all_z_sim)
# Stima K su tutti i punti simulati usando curve_fit
popt, _ = curve_fit(hyperbolic_model, all_v_px, all_z_sim)
K_estimated = popt[0] # Valore stimato di K
# Calcola i residui come (z_sim - z_model)
z_model = hyperbolic_model(all_v_px, K_estimated) # Modello z = K / v_px
residuals = all_z_sim - z_model # Residui = z_sim - z_model
# Calcola sigma_0 (deviazione standard dei residui)
m = len(all_v_px) # Numero totale di punti
sigma_0 = calculate_sigma(residuals, m, n=1) # Calcola sigma_0
# Genera valori equidistanti di v_px tra il minimo e il massimo
v_px_fitted = np.linspace(all_v_px.min(), all_v_px.max(), 100)
z_fitted = hyperbolic_model(v_px_fitted, K_estimated) # Calcola i valori z teorici
# Plot dei punti simulati (scatter) e della curva stimata (line)
plt.figure(figsize=fig_size)
plt.grid(True)
plt.scatter(all_v_px, all_z_sim, s=5, color='blue', marker="o", alpha=0.005, label='Punti simulati')
plt.plot(v_px_fitted, z_fitted, color='red', linewidth=2, label=f'Curva stimata: K={K_estimated:.2f}')
plt.title(f'All simulation for $V_{{ext}}$ = {velocity_key.split("_")[-1]} ($\\frac{{m}}{{s}}$)')
plt.xlabel('$v_{px}$ [$\\frac{px}{s}$]', fontsize=20)
plt.ylabel('$z [m]$', fontsize=20)
plt.ylim(0, 3)
plt.xlim(0,3800)
plt.legend(fontsize=15)
plt.tick_params(axis='both', which='major', labelsize=label_size_axes)
plt.subplots_adjust(bottom=adjusto)
existing_files = [f for f in os.listdir(save_directory) if os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
# Generate a file name based on the next file number
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
plt.show()
# Stampa il valore di sigma_0
print(f'Sigma_0 - ALL SIM - residui {velocity_key}: {sigma_0:.4f}')
def analyze_clusters_sigma_vs_px_velocity(modelled_df, vpx_bin_width=10):
"""
Per ogni DataFrame che contiene i valori di K e sigma_0, calcola i residui rispetto
al modello z = K / v_px per ogni punto simulato. Divide i dati di v_px in intervalli di ampiezza fissa
(es. 10 unità), calcola la media dei residui per ogni intervallo, e plotta un singolo grafico con tutti i set di dati.
Parametri:
- modelled_df: dizionario con DataFrame per ogni velocità, contenente i valori di v_px, z_sim, K e sigma_0.
- vpx_bin_width: ampiezza dell'intervallo per raggruppare i valori di v_px.
Ritorna:
- Nessun valore di ritorno, ma plotta un grafico unico con tutte le velocità.
"""
# Imposta lo stile di Seaborn
sns.set_theme(style="whitegrid")
plt.figure(figsize=fig_size)
plt.grid(True)
for velocity_key, df in modelled_df.items():
# Numero di punti simulati (viene calcolato dinamicamente)
num_points = sum('vx_' in col for col in df.columns)
# Unisci tutti i valori di v_px e z_sim per tutte le simulazioni in un'unica lista
all_v_px = np.concatenate([df[f'vx_{i}'].values for i in range(1, num_points + 1)])
all_z_sim = np.concatenate([df[f'z_{i}'].values for i in range(1, num_points + 1)])
# Ripeti i valori di K per il numero di punti per ottenere una lista della stessa lunghezza di all_v_px
K_values = np.repeat(df['K'].values, num_points)
# Calcola i residui assoluti per ciascun valore di z e v_px
residuals = np.abs(all_z_sim - (K_values / all_v_px))
# Bin dei valori di v_px
vpx_min, vpx_max = all_v_px.min(), all_v_px.max()
bins = np.arange(vpx_min, vpx_max + vpx_bin_width, vpx_bin_width)
# Calcola il residuo medio per ciascun intervallo di v_px
mean_residuals = []
mean_v_px = []
for i in range(len(bins) - 1):
# Intervallo corrente
bin_mask = (all_v_px >= bins[i]) & (all_v_px < bins[i + 1])
if np.any(bin_mask): # Se ci sono valori nell'intervallo
# Media di v_px e residuo per i punti che cadono in questo intervallo
mean_residuals.append(np.sqrt(np.mean(residuals[bin_mask]**2)))
mean_v_px.append(np.mean(all_v_px[bin_mask]))
# Aggiungi i punti con trasparenza e bordo nero per migliorare la leggibilità
sns.scatterplot(x=mean_v_px, y=mean_residuals, marker='o', s=sns_dot_size, alpha=0.8, edgecolor="black",
linewidth=0.6,
label=f'$V_{{ext}}$ = {velocity_key.split("_")[-1]}$ \\left(\\frac{{m}}{{s}}\\right)$')
# Configurazione del grafico
plt.title('RMS Residuals of RANGE', fontsize=20)
plt.xlabel(r'$v_{px}$ $\left[\frac{px}{s}\right]$', fontsize=20)
plt.ylabel(r'$\epsilon$ [m]', fontsize=20)
plt.ylim(y_lim_starter,y_limiter)
plt.xlim(x_lim_defined)
# Formattazione degli assi
plt.tick_params(axis='both', which='major', labelsize=label_size_axes)
# Legenda
plt.legend(fontsize=15)
# Aggiusta gli spazi
plt.subplots_adjust(bottom=adjusto)
# Mantiene il metodo di salvataggio originale
existing_files = [f for f in os.listdir(save_directory) if os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
# Mostra il grafico
plt.show()
def plot_sigma_histograms(updated_dfs):
"""
Per ogni DataFrame nel dizionario 'updated_dfs', genera un istogramma di sigma_0 in percentuale,
calcola il valore RMS di sigma_0, disegna la gaussiana perfetta normalizzata per adattarsi
all'istogramma in percentuale, e una retta verticale sul valore RMS calcolato.
Parametri:
- updated_dfs: dizionario con DataFrame per ogni velocità, contenente i valori di K e sigma_0
Ritorna:
- Un dizionario con il valore RMS di sigma_0 per ogni velocità.
"""
sigma_rms_values = {} # Dizionario per salvare i valori RMS di sigma_0 per ogni velocità
# Itera su ogni DataFrame nel dizionario delle velocità
for velocity_key, velocity_df in updated_dfs.items():
# Estrai i valori di sigma_0 dal DataFrame
sigma_0_values = velocity_df['sigma_0'].values
# Calcola il valore RMS di sigma_0 per questa velocità
sigma_rms = np.sqrt(np.mean(sigma_0_values ** 2))
sigma_rms_values[velocity_key] = sigma_rms # Salva il valore RMS per questa velocità
# Parametri per la distribuzione normale (media e deviazione standard)
mu, std = np.mean(sigma_0_values), np.std(sigma_0_values)
# Genera l'istogramma per ottenere le informazioni sui bin
num_bins = 20
counts, bin_edges = np.histogram(sigma_0_values, bins=num_bins)
bin_width = bin_edges[1] - bin_edges[0]
total_count = len(sigma_0_values)
# Converti i conteggi in percentuale
counts_percentage = (counts / total_count) * 100
# Genera la gaussiana normalizzata per l'istogramma in percentuale
x = np.linspace(bin_edges[0], bin_edges[-1], 1000)
gaussian = (stats.norm.pdf(x, mu, std) * total_count * bin_width) / total_count * 100
# Plot dell'istogramma
plt.figure(figsize=fig_size)
plt.grid(True)
plt.hist(sigma_0_values, bins=20, color='skyblue', edgecolor='black', alpha=0.7,
label='$\sigma_0$')
plt.plot(x, gaussian, color='black', linestyle='--')
plt.axvline(sigma_rms, color='red', linestyle='--', label=f'RMS $\\sigma_0$: {sigma_rms:.4f}')
# Configurazione del grafico
plt.title(f'Histogram of $\\sigma_0$, $V_{{ext}}$ = {velocity_key.split("_")[-1]} $\\frac{{m}}{{s}}$', fontsize=20)
plt.xlabel('$\\sigma_0$ [m]', fontsize=20)
plt.ylabel('Frequency (%)', fontsize=20)
plt.legend(fontsize=15)
# Configurazione dei ticks con font impostato
plt.tick_params(axis='both', which='major', labelsize=label_size_axes)
plt.subplots_adjust(bottom=adjusto)
existing_files = [f for f in os.listdir(save_directory) if os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
# Generate a file name based on the next file number
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
plt.close()
#plt.show()
# Stampa il valore RMS di sigma_0
print(f'Valore RMS di $\\sigma_0$ per la velocità {velocity_key}: {sigma_rms:.4f} [m]')
return sigma_rms_values
def hyperbolic_model(vx, K):
return K / vx
def calculate_sigma(residuals, m, n):
"""Calculates standard deviation of residuals (sigma_0)."""
return np.sqrt(np.sum(residuals**2) / (m - n))
def estimate_k_and_sigma0(velocity_simulations_dfs):
"""
Per ogni DataFrame nelle simulazioni delle velocità, fitta il modello z = K / vx.
Per ogni riga, calcola K, calcola i residui, e aggiungi il valore di K e sigma_0
in due nuove colonne per ogni DataFrame.
Parametri:
- velocity_simulations_dfs: dizionario con DataFrame per ogni velocità
Ritorna:
- Un dizionario con DataFrame aggiornati, dove ogni DataFrame ha 2 nuove colonne:
'K' e 'sigma_0'
"""
# Dizionario per salvare i DataFrame aggiornati
updated_dfs = {}
# Itera su ogni DataFrame nel dizionario delle simulazioni delle velocità
for velocity_key, velocity_df in velocity_simulations_dfs.items():
# Liste per salvare K e sigma_0 per ogni riga
k_values = []
sigma_0_values = []
# Numero di punti, metà per vx e metà per z
num_points = len([col for col in velocity_df.columns if col.startswith("vx_")])
# Itera su ogni riga del DataFrame (ogni riga è una simulazione)
for index, row in velocity_df.iterrows():
# Estrai tutti i valori di vx e z
vx_sim = row[[f'vx_{i + 1}' for i in range(num_points)]].values
z_sim = row[[f'z_{i + 1}' for i in range(num_points)]].values
# Fitta il modello z = K / vx usando curve_fit
popt, _ = curve_fit(hyperbolic_model, vx_sim, z_sim)
K_estimated = popt[0] # Il valore stimato di K
# Calcola i residui (differenza tra z_sim e il modello predetto)
z_model = hyperbolic_model(vx_sim, K_estimated)
residuals = z_sim - z_model
# Calcola sigma_0 (deviazione standard dei residui)
sigma_0 = calculate_sigma(residuals, m=len(vx_sim), n=1)
# Aggiungi K e sigma_0 alle liste
k_values.append(K_estimated)
sigma_0_values.append(sigma_0)
DRAW_CURVE_FIT = 0
if DRAW_CURVE_FIT:
# Genera 50 valori di vx tra il minimo e il massimo dei valori reali
vx_fitted = np.linspace(vx_sim.min(), vx_sim.max(), 50)
z_fitted = hyperbolic_model(vx_fitted, K_estimated) # Calcola i valori z teorici
# Plot dei dati e della curva stimata con i 50 punti
plt.figure(figsize=fig_size)
plt.scatter(vx_sim, z_sim, color='blue', label='Data points')
plt.plot(vx_fitted, z_fitted, color='red', label=f'Fitted curve: K={K_estimated:.2f}')
plt.title(f'Simulation {index + 1} - Velocity: {velocity_key}')
plt.xlabel('vx')
plt.ylabel('z')
plt.legend()
plt.grid(True)
plt.subplots_adjust(bottom=adjusto)
existing_files = [f for f in os.listdir(save_directory) if
os.path.isfile(os.path.join(save_directory, f))]
next_file_number = len(existing_files) + 1
# Generate a file name based on the next file number
file_name = f"{next_file_number}.png"
file_path = os.path.join(save_directory, file_name)
plt.savefig(file_path, bbox_inches='tight')
plt.show()
# Aggiungi le nuove colonne 'K' e 'sigma_0' al DataFrame corrente
velocity_df['K'] = k_values
velocity_df['sigma_0'] = sigma_0_values
# Salva il DataFrame aggiornato nel dizionario
updated_dfs[velocity_key] = velocity_df
return updated_dfs
def apply_moving_average_filter(vx_simulated, z_simulated, win):
"""
Applica un filtro a media mobile a vx_simulated e regola la lunghezza di z_simulated
per adattarsi alla lunghezza ridotta di vx_simulated a causa del filtro.
Parametri:
- vx_simulated: array dei valori simulati di vx.
- z_simulated: array dei valori simulati di z.
- win: dimensione della finestra per il filtro a media mobile.
Ritorna:
- vx_filtered: array di vx simulati filtrati.
- z_adjusted: array di z simulati adattati.
"""
# Applica il filtro a media mobile a vx_simulated
vx_filtered = np.convolve(vx_simulated, np.ones(win) / win, mode='valid')
# Adatta la lunghezza di z_simulated
z_adjusted = z_simulated[:len(vx_filtered)]
return vx_filtered, z_adjusted
def generate_montecarlo_simulations(window, file_path, num_simulations=5000, z_std_dev=0.05, vpx_std_dev=10.0, plot=False):
def moving_average_filter(vx_values, window):
"""
Applica un filtro a media mobile "trailing" al vettore vx_values:
- Per gli indici in cui è disponibile una finestra completa (lunghezza = window),
calcola la media dei valori correnti e dei successivi (i.e. np.convolve in mode 'valid').
- Per gli ultimi elementi per cui non esiste una finestra completa,
utilizza la media calcolata per l'ultima finestra completa.
L'output ha la stessa lunghezza dell'input.
"""
N = len(vx_values)
if N < window:
return vx_values.copy()
# Calcola la media mobile per tutte le finestre complete
valid_avg = np.convolve(vx_values, np.ones(window) / window, mode='valid')
# valid_avg ha lunghezza: N - window + 1
# Calcola quanti elementi mancano per arrivare a N
pad_length = N - valid_avg.shape[0]
# Crea un array di lunghezza pad_length, riempito con l'ultimo valore della media mobile
pad = np.full(pad_length, valid_avg[-1])
# Concatena il risultato della convoluzione e il padding, ottenendo un array di lunghezza N
return np.concatenate([valid_avg, pad])
"""
Esegue simulazioni Monte Carlo per ciascuna velocità 3D (v_3d) in un file di input.
Filtra i dati di velocità v_px con una media mobile, quindi genera simulazioni Monte Carlo.
Parametri:
- file_path: percorso del file Excel con i dati originali.
- num_simulations: numero di simulazioni Monte Carlo da eseguire per ciascuna velocità 3D.
- z_std_dev: deviazione standard per z_mean da utilizzare nelle simulazioni.
- vpx_std_dev: deviazione standard per v_px da utilizzare nelle simulazioni.
- plot: flag per attivare/disattivare il plot di ogni simulazione.
Ritorna:
- Un dizionario con una chiave per ogni velocità (v_3d) e un DataFrame contenente i risultati delle simulazioni.
"""
# Carica i dati dal file Excel
print("SIMULATING :",num_simulations)
df = pd.read_excel(file_path)
velocity_simulations_dfs = {}
# Identifica le velocità uniche in vx_3D
velocities = df['vx_3D'].unique()
# Dizionario per salvare un DataFrame di simulazioni per ogni velocità
# Itera su ciascuna velocità 3D
for velocity in velocities:
# Filtra i dati per la velocità corrente
velocity_df = df[df['vx_3D'] == velocity]
# Filtra vx con una media mobile, se necessario
if window > 1:
print(f"WINDOWING for velocity {velocity}")
# Identifica i marker unici
vx_filtered = np.empty(len(velocity_df))
vx_filtered_temp = []
# Raggruppa velocity_df in blocchi consecutivi in cui il marker è costante.
groups = [group for _, group in
velocity_df.groupby((velocity_df['marker'] != velocity_df['marker'].shift()).cumsum())]
# Applica il filtro per ciascun blocco separato
for group in groups:
# Ottieni gli indici originali di questo blocco
indices = group.index
# Estrai i valori originali di vx per questo blocco
vx_values = group['vx'].values
filtered_values = moving_average_filter(vx_values, window)
# Inserisci i valori filtrati nelle posizioni originali dell'array finale
vx_filtered_temp.append(filtered_values)
# Inserisci i valori filtrati nelle posizioni originali
vx_filtered = np.concatenate(vx_filtered_temp)
z_adjusted = velocity_df['z_mean'].values
else:
vx_filtered = velocity_df['vx'].values
z_adjusted = velocity_df['z_mean'].values
# Lista per memorizzare tutte le simulazioni per la velocità corrente
all_simulations = []
# Esegui le simulazioni Monte Carlo
for sim_num in range(num_simulations):
if sim_num % 50 == 0:
print(sim_num, end=" ")
# Genera valori simulati per ciascun punto del dataset
vx_simulated = np.abs(np.random.normal(vx_filtered * 60, vpx_std_dev))
z_simulated = np.random.normal(z_adjusted, z_std_dev)
# Crea una riga di simulazione concatenando i valori simulati di vx e z
simulation_row = np.concatenate([vx_simulated, z_simulated])
all_simulations.append(simulation_row)
# Plot della simulazione corrente, se richiesto
if plot:
plt.figure(figsize=fig_size)
plt.scatter(vx_simulated, z_simulated, color='blue', alpha=0.6, label='Simulazione Monte Carlo')
plt.xlabel('v_px (simulato) [px/s]')
plt.ylabel('z (simulato)')
plt.title(f'Simulazione Monte Carlo #{sim_num + 1} per velocità {velocity}')
plt.grid(True)
plt.legend()
plt.subplots_adjust(bottom=adjusto)
plt.show()
print("simulation terminated")
# Nomi delle colonne per i risultati (es. vx_1, vx_2, ..., z_1, z_2, ...)
column_names = [f'vx_{i + 1}' for i in range(len(vx_filtered))] + [f'z_{i + 1}' for i in range(len(vx_filtered))]
# Trasforma i risultati delle simulazioni in un DataFrame
simulations_df = pd.DataFrame(all_simulations, columns=column_names)
# Salva il DataFrame nel dizionario con la velocità come chiave
velocity_simulations_dfs[f'velocity_{velocity}'] = simulations_df
return velocity_simulations_dfs
def extract_mu_and_sigma_from_marker(file_path, std_px=1, z_std_dev_literature=0.05, apply_moving_average=True):
"""
Calcola l'incertezza propagata su 'v_px' da un file Excel.
Usa propagazione dell'incertezza su v_px basata su std_px e std_t (deviazione std del delta_t tra frame).
Parametri:
- file_path: percorso del file Excel
- std_px: deviazione standard sullo spostamento in pixel (default=1)
- z_std_dev_literature: deviazione standard per z_mean dalla letteratura
- apply_moving_average: flag per attivare/disattivare la media mobile a 3 elementi su 'v_px'
Ritorna:
- Un dizionario con solo la deviazione standard propagata per v_px e z_mean per ogni velocità.
"""
video_path ="/home/mmt-ben/DepthFromOpticalFlow/video_raw"
dt_results = calculate_video_dt_std(video_path)
print(dt_results)
# Legge il file Excel
df = pd.read_excel(file_path)
print(df)
# Trasforma la colonna 'vx' in valore assoluto e moltiplica per 60
df['vx'] = df['vx'].abs() * 60
# Calcola la differenza di tempo tra i frame consecutivi
df['delta_t'] = df['time'].diff().fillna(0) # Differenza di tempo tra i frame
std_t = df['delta_t'].std() # Deviazione standard di delta_t
# Applica la media mobile a 3 elementi su 'vx' se la flag è attivata
if apply_moving_average:
df['vx'] = df['vx'].rolling(window=3, min_periods=1).mean()
# Raggruppa i dati solo per 'vx_3D'
grouped = df.groupby(['vx_3D'])
# Dizionario per salvare solo la deviazione standard propagata
results = {}
# Calcolo della propagazione dell'incertezza su v_px per ciascun gruppo vx_3D
for vx_3D, group in grouped:
# Calcola la deviazione standard propagata su v_px
delta_px_mean = group['pixel_displacement'].mean() # Spostamento medio in pixel
delta_t_mean = group['delta_t'].mean() # Tempo medio tra frame
std_vpx = (std_px / delta_px_mean) * np.sqrt((std_px / delta_px_mean)**2 + (std_t / delta_t_mean)**2)
# Salva solo i risultati di std_vpx e z_std_dev_literature per vx_3D
results[vx_3D] = {
'std_vpx': std_vpx,
'z_std_dev_literature': z_std_dev_literature
}
# Stampa i risultati per controllo
print(f"Velocità {vx_3D}: std_vpx = {std_vpx:.4f}, z_std_dev_literature = {z_std_dev_literature}")
print(results)
sys.exit()
return results
# Esempio di utilizzo:
# result_dict = extract_mu_and_sigma_from_marker('path_to_file.xlsx', apply_moving_average=True, frame_time_std=0.01)
def bland_altman_plot(x, y, k, save_path, file_name="bland_altman_plot.png"):
"""
Crea e salva un grafico Bland-Altman per valutare il fitting del modello y = k/x.
Parametri:
x (array): Valori osservati della variabile indipendente.
y (array): Valori osservati della variabile dipendente.
k (float): Parametro del modello y = k/x.
save_path (str): Cartella in cui salvare il grafico.
file_name (str): Nome del file per il grafico salvato (default: "bland_altman_plot.png").
"""
# Calcolo delle previsioni
y_pred = k / x
# Calcolo delle differenze e delle medie
diff = y - y_pred
mean = (y + y_pred) / 2
# Calcolo della media e della deviazione standard delle differenze
mean_diff = np.mean(diff)
std_diff = np.std(diff, ddof=1)
# Limiti di accordo
lower_limit = mean_diff - 1.96 * std_diff
upper_limit = mean_diff + 1.96 * std_diff
# Creazione del grafico Bland-Altman
plt.figure(figsize=fig_size)
plt.scatter(mean, diff, color='blue', s=50, label='Differenze')
plt.axhline(mean_diff, color='gray', linestyle='--', label='Media delle differenze')
plt.axhline(lower_limit, color='red', linestyle='--', label='Limite inferiore (95%)')
plt.axhline(upper_limit, color='red', linestyle='--', label='Limite superiore (95%)')
plt.xlabel('Media dei valori osservati e previsti')
plt.ylabel('Differenza tra valori osservati e previsti')
plt.title('Grafico Bland-Altman')
plt.legend()
plt.grid(True)
plt.show()
# Creazione della cartella se non esiste
if not os.path.exists(save_path):
os.makedirs(save_path)
# Salvataggio del grafico
save_file = os.path.join(save_path, file_name)
plt.savefig(save_file)
plt.close()
def convert_robot_in_staircase_signal(noisy_signal, quantization_levels = v_all):
# Interpolazione dei vuoti (se ci sono NaN)
noisy_signal = pd.Series(noisy_signal).interpolate().tolist()
# Applica un filtro passa basso per ridurre il rumore
b, a = signal.butter(3, 0.05)
smoothed_signal = signal.filtfilt(b, a, noisy_signal)
# Aggiungi lo zero ai livelli di quantizzazione
quantization_levels = np.array([0] + quantization_levels)