-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdiscrete_structure.py
More file actions
394 lines (315 loc) · 15.8 KB
/
discrete_structure.py
File metadata and controls
394 lines (315 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
import pandas as pd
import pybnesian as pb
from itertools import product
import math
import numpy as np
import random
from concurrent.futures import ThreadPoolExecutor
import itertools
# Helper function to emulate starmap for ThreadPoolExecutor
# def thread_starmap(executor, func, args_list):
# futures = [executor.submit(func, *args) for args in args_list]
# return [future.result() for future in futures]
import sys
import time
def log_debug(msg):
sys.stderr.write(f"[DEBUG {time.strftime('%H:%M:%S')}] {msg}\n")
sys.stderr.flush()
#Tiene completado de dataset
def EM(red, dataframe, clusters_names, kmax=100):
k = 0
rb = red.clone()
df = dataframe.copy()
# Instanciamos los parámetros de la variable cluster, como se va a hacer stochastic EM en vez de fijar los parámetros simplemente sampleamos una muestra de la variable cluster
# para parámetros sampleados de una uniforme.
probability_distribution = [np.random.uniform() for i in range(len(clusters_names))]
ini_clust = weighted_choice(clusters_names, probability_distribution)
df['cluster'] = ini_clust
#Dataframe completation
# Prepare categories for combinations
categor = [dataframe[var].cat.categories.tolist() for var in dataframe.columns]
categor.append(clusters_names)
# Generate combinations in parallel
x = parallel_generate_combinations(categor, df.columns, num_chunks=10)
# Concatenate the generated combinations with the original DataFrame
df = pd.concat([df, x], ignore_index=True)
for var in df.columns:
df[var] = df[var].astype('category')
# Fit
# Con esta muestra sampleada hacemos el primer fit de la red utilizando la libreria de pybnesian.
rb.fit(df)
# Comenzamos las iteraciones del algoritmo
while k < kmax:
df = dataframe.copy()
logl = parallel_compute_logl(clusters_names, df, rb) # esta lista contiene las listas de los loglikelihoods para cada punto del dataframe con cada posible cluster: P(c'k',X)
# Este loglikelihood se utilizará para el cálculo de P(C|X)=P(C,X)/P(X). Como C es categórica con elevar e al loglikelhood tenemos P(C,X) y normalizando P(C|X)
new_c = parallel_sample_clusters(df, logl, clusters_names) # esta lista va a contener la nueva muestra sampleada de P(C|X) en el paso expectation
df['cluster'] = new_c
#Dataframe completation
# Prepare categories for combinations
categor = [dataframe[var].cat.categories.tolist() for var in dataframe.columns]
categor.append(clusters_names)
# Generate combinations in parallel
x = parallel_generate_combinations(categor, df.columns, num_chunks=10)
# Concatenate the generated combinations with the original DataFrame
df = pd.concat([df, x], ignore_index=True)
for var in df.columns:
df[var] = df[var].astype('category')
# Fit
# Con esta muestra sampleada hacemos el fit de la red utilizando la libreria de pybnesian.
rb.fit(df)
rb = red.clone()
rb.fit(df)
k = k + 1
return rb
'''def structure_logl(red, dataframe, clusters_names, sample=20): #esta función estima el expected loglikelihood de los datos.
posterioris_x = []
rb = red.clone()
logl = [] # esta lista contiene las listas de los loglikelihoods para cada punto del dataframe con cada posible cluster: P(c'k',X)
slogl = [] # esta lista contiene la suma de los loglikelihood de cada punto del dataframe para cada muestra de 'clusters' sampleada de P(C|X)
df = dataframe.copy()
for cluster in clusters_names: # para cada cluster añadimos una columna al dataframe con el cluster dado para poder calcular P(c'k',X)
c = [cluster for i in range(df.shape[0])]
df['cluster'] = pd.Series(pd.Categorical(c, categories=clusters_names))
logl.append(rb.logl(df).tolist())
for row in range(df.shape[0]):
lklh = []
for cluster in range(len(clusters_names)):
x = math.exp(logl[cluster][row])
lklh.append(x)
t = sum(lklh)
prob_posterior = [x / t for x in lklh]
posterioris_x.append(prob_posterior)
for i in range(0, sample):
new_c = []
df = dataframe.copy()
for k in range(df.shape[0]):
new_c.append(np.random.choice(np.asarray(clusters_names), p=posterioris_x[k]))
new_c = pd.Series(pd.Categorical(new_c, categories=clusters_names))
df['cluster'] = new_c
slogl.append(red.slogl(df))
return sum(slogl) / len(slogl)'''
def n_param(red, number_of_clusters,categories_df): # dado una red, el nº de clusters y las categorías de cada variable (es inaccesible por pybnesian) calculamos el nº de parámetros estimados
n = number_of_clusters - 1 # comenzamos con el nº de parámetros de la variable cluster que como bien sabemos es number_clusters-1 debido a que trabajamos con un TAN
for var in red.children('cluster'): # como todos son hijos de cluster accedemos a las variables de esta forma.
n = n + (red.num_parents(var)) * (len(categories_df[var]) - 1)
return n
#Parallelized with Pool
'''def structure_logl(M_h, M_n_h, dataframe, clusters_names): #esta función estima el expected loglikelihood de los datos.
"""
Parallelized computation of structure_logl.
"""
rb_n_h = M_n_h.clone()
rb_h = M_h.clone()
df = dataframe.copy()
# Parallelize the computation for each row
with ThreadPoolExecutor() as pool:
row_sums = thread_starmap(pool, compute_row_contribution,
[(row_index, df.copy(), rb_n_h, rb_h, clusters_names) for row_index in range(df.shape[0])]
)
# Aggregate the results
structure_logl = sum(row_sums)
return structure_logl'''
def structure_logl(M_h, M_n_h, dataframe, clusters_names): #esta función estima el expected loglikelihood de los datos.
"""
Sequential computation of structure_logl.
"""
rb_n_h = M_n_h.clone()
rb_h = M_h.clone()
df = dataframe.copy()
row_sums = []
for row_index in range(df.shape[0]):
row_sums.append(compute_row_contribution(row_index, df.copy(), rb_n_h, rb_h, clusters_names))
# Aggregate the results
structure_logl = sum(row_sums)
return structure_logl
def sem(bn, dataframe, categories_df, clusters_names, max_iter=2, em_kmax=50):
clgbn = EM(bn, dataframe, clusters_names, em_kmax) # comenzamos estimando la red naive bayes
best = clgbn.clone()
i = 0 # controla cuando se llega a un punto estacionario el máximo nº de iteraciones a realizar
df = dataframe.copy()
BIC = -2 * structure_logl(clgbn, clgbn, df, clusters_names) + math.log(df.shape[0]) * n_param(clgbn,len(clusters_names),categories_df) # bic de la primera red naive
log_debug(f"Initial BIC: {BIC}")
participant_nodes = list(df.columns.copy()) # Nodos de la red
possible_arcs = list(itertools.permutations(participant_nodes,2)) # Posibles parejas de arcos entre los nodos de la red salvo cluster
# Comenzamos el algoritmo sem
while i < max_iter:
log_debug(f"SEM iteration {i}, current BIC: {BIC}")
s = 0 # controla si se ha mejorado en la iteración
k = 0 # lo utilizamos para ir recorriendo la lista de posibles acciones (en este caso añadir arco)
random.shuffle(possible_arcs) # mezclamos aleatoriamente los posibles arcos que se pueden introducir
while k < len(possible_arcs):
if clgbn.can_add_arc(possible_arcs[k][0], possible_arcs[k][1]): # comprobamos si el arco puede ser añadido
red = pb.DiscreteBN(nodes=clgbn.nodes(),arcs=clgbn.arcs()) # en caso de que pueda ser añadido generamos una nueva red con el arco añadido, estimamos parámetros y comparamos BIC
red.add_arc(possible_arcs[k][0], possible_arcs[k][1])
red = EM(red, df, clusters_names, em_kmax)
l = -2 * structure_logl(red, clgbn, df, clusters_names) + math.log(df.shape[0]) * n_param(red, len(clusters_names), categories_df)
if l >= BIC: # si no mejoramos pasamos al siguiente arco
k = k + 1
else: # si mejoramos BIC se actualiza y red se actualiza
k = len(possible_arcs)
BIC = l
clgbn = red
best = clgbn.clone()
s = s + 1
else:
k = k + 1 # si no se puede introducir el arco pasamos al siguiente
k = 0
possible = list(
clgbn.arcs()) # En este caso necesitamos trabajar con la lista de arcos existentes ya que tratamos de invertir alguno
for element in participant_nodes: # eliminamos los arcos (cluster,variable) ya que estos no deben tocarse
if ('cluster', element) in possible:
possible.remove(('cluster', element))
random.shuffle(possible) # hacemos un random order de los posibles candidatos
while k < len(possible):
if clgbn.can_flip_arc(possible[k][0], possible[k][1]): # si se puede invertir el arco de nuevo probamos y comparamos si se mejora el BIC
red = pb.DiscreteBN(nodes=clgbn.nodes(), arcs=clgbn.arcs())
red.flip_arc(possible[k][0], possible[k][1])
red = EM(red, df, clusters_names, em_kmax)
l = -2 * structure_logl(red, clgbn, df, clusters_names) + math.log(df.shape[0]) * n_param(red, len(clusters_names), categories_df)
if l >= BIC:
k = k + 1
else:
k = len(possible)
BIC = l
clgbn = red
best = clgbn.clone()
s = s + 1
else:
k = k + 1
k = 0
possible = list(
clgbn.arcs()) # de nuevo queremos eliminar arcos por ello los candidatos son los arcos ya existentes salvo los que no se deben eliminar (cluster,variable)
for element in participant_nodes:
if ('cluster', element) in possible:
possible.remove(('cluster', element))
random.shuffle(possible)
while k < len(possible):
red = pb.DiscreteBN(nodes=clgbn.nodes(), arcs=clgbn.arcs())
red.remove_arc(possible[k][0], possible[k][1])
red = EM(red, df, clusters_names, em_kmax)
l = -2 * structure_logl(red, clgbn, df, clusters_names) + math.log(df.shape[0]) * n_param(red,len(clusters_names),categories_df)
if l >= BIC:
k = k + 1
else:
k = len(possible)
BIC = l
clgbn = red
best = clgbn.clone()
s = s + 1
log_debug(f"End of inner loop, new BIC: {BIC}")
if s == 0: # si no se mejora comienza el contador de maxiter
i = i + 1
else:
i = 0
return best
def weighted_choice(elements, weights):
cumulative_weights = np.cumsum(weights)
total = cumulative_weights[-1]
random_value = np.random.uniform(0, total)
return elements[np.searchsorted(cumulative_weights, random_value)]
'''def parallel_generate_combinations(categor, df_columns, num_chunks):
"""
Generate combinations in parallel by splitting the workload into a specified number of chunks.
"""
# Determine the total number of combinations
total_combinations = np.prod([len(cat) for cat in categor])
# Calculate the size of each chunk
chunk_size = total_combinations // num_chunks
ranges = [(i * chunk_size, (i + 1) * chunk_size) for i in range(num_chunks)]
# Ensure the last chunk includes any remaining combinations
ranges[-1] = (ranges[-1][0], total_combinations)
# Generate and process chunks in parallel
with ThreadPoolExecutor() as pool: # Automatically uses all available cores
# Generate chunks of combinations
chunks = thread_starmap(pool, generate_combinations_chunk, [(categor, start, end) for start, end in ranges])
# Process each chunk into a DataFrame
dataframes = thread_starmap(pool, process_combinations_chunk, [(chunk, df_columns) for chunk in chunks])
# Concatenate all DataFrames into a single DataFrame
return pd.concat(dataframes, ignore_index=True)'''
def parallel_generate_combinations(categor, df_columns, num_chunks):
"""
Generate combinations sequentially (renamed to keep compatibility but is sequential).
"""
# Generate all combinations directly
combinations = product(*categor)
# Convert to DataFrame
return pd.DataFrame(combinations, columns=df_columns)
def generate_combinations_chunk(categor, start, end):
"""
Generate a chunk of combinations from the Cartesian product.
"""
combinations = product(*categor)
chunk = [comb for i, comb in enumerate(combinations) if start <= i < end]
return chunk
def process_combinations_chunk(chunk, df_columns):
"""
Convert a chunk of combinations into a DataFrame.
"""
return pd.DataFrame(chunk, columns=df_columns)
def compute_logl_for_cluster(cluster, df, rb, clusters_names):
"""
Compute the log-likelihood for a single cluster.
"""
# Create a column with the cluster value for all rows
c = [cluster] * df.shape[0]
df['cluster'] = pd.Series(pd.Categorical(c, categories=clusters_names))
# Compute the log-likelihood for the cluster
return rb.logl(df).tolist()
def parallel_compute_logl(clusters_names, df, rb):
"""
Sequential computation of log-likelihoods for all clusters.
"""
results = []
for cluster in clusters_names:
results.append(compute_logl_for_cluster(cluster, df.copy(), rb, clusters_names))
return results
def compute_posterior_and_sample(row_index, logl, clusters_names):
"""
Compute P(C|x) and sample a cluster for a single row.
"""
lklh = [] # List of likelihoods for each cluster
for n in range(len(clusters_names)):
x = math.exp(logl[n][row_index]) # Compute P(C,X) for the cluster
lklh.append(x)
# Sample a cluster based on the posterior probabilities
return weighted_choice(clusters_names, lklh)
'''def parallel_sample_clusters(df, logl, clusters_names):
"""
Parallelize the computation of P(C|x) and sampling for all rows.
"""
with ThreadPoolExecutor() as pool:
# Distribute the computation across all rows
new_c = thread_starmap(pool, compute_posterior_and_sample,
[(row_index, logl, clusters_names) for row_index in range(df.shape[0])]
)
return new_c'''
def parallel_sample_clusters(df, logl, clusters_names):
"""
Sequential computation of P(C|x) and sampling for all rows.
"""
new_c = []
for row_index in range(df.shape[0]):
new_c.append(compute_posterior_and_sample(row_index, logl, clusters_names))
return new_c
def compute_row_contribution(row_index, df, rb_n_h, rb_h, clusters_names):
"""
Compute the sum of contributions for a single row:
- Compute posterior probabilities for all clusters.
- Multiply each posterior probability by rb_h.logl(row) and sum the results.
"""
# Compute unnormalized posterior probabilities
unnormalized_probs = []
for cluster in clusters_names:
df['cluster'] = pd.Series(pd.Categorical([cluster] * df.shape[0], categories=clusters_names))
unnormalized_probs.append(math.exp(rb_n_h.logl(df.iloc[[row_index]])[0]))
# Normalize posterior probabilities
total_prob = sum(unnormalized_probs)
posterior_probs = [prob / total_prob for prob in unnormalized_probs]
# Compute the sum of contributions for the row
row_sum = 0
for cluster, posterior_prob in zip(clusters_names, posterior_probs):
df['cluster'] = pd.Series(pd.Categorical([cluster] * df.shape[0], categories=clusters_names))
logl_h = rb_h.logl(df.iloc[[row_index]])[0]
row_sum += posterior_prob * logl_h
return row_sum