@@ -2,7 +2,7 @@ module ParallelOperations
22
33using Distributed
44
5- import Base: reduce, sum, maximum, minimum
5+ import Base: reduce
66
77export
88 sendto, @sendto ,
@@ -17,19 +17,24 @@ export
1717 allgather,
1818 allreduce,
1919
20- sum, allsum,
21- maximum, allmaximum,
22- minimum, allminimum
20+ allsum,
21+ allmaximum,
22+ allminimum
2323
2424# point-to-point
2525
2626function sendto (p:: Int , expr, data, mod:: Module = Main)
27- @async @spawnat (p, Core. eval (mod, Expr (:(= ), expr, data)))
27+ remotecall_fetch (p) do
28+ Core. eval (mod, Expr (:(= ), expr, data))
29+ end
2830end
2931
3032function sendto (p:: Int , mod:: Module = Main; args... )
31- data = Dict (nm => val for (nm, val) in args)
32- @async @spawnat (p, Core. eval (mod, Expr (:(= ), :parallel_data , data)))
33+ for (nm, val) in args
34+ remotecall_fetch (p) do
35+ Core. eval (mod, Expr (:(= ), nm, val))
36+ end
37+ end
3338end
3439
3540function sendto (p:: Int , f:: Function , expr, mod:: Module = Main; args = ())
@@ -61,7 +66,9 @@ macro sendto(p, expr, mod::Symbol = :Main)
6166end
6267
6368function getfrom (p:: Int , expr, mod:: Module = Main)
64- return fetch (@spawnat (p, Core. eval (mod, expr)))
69+ return remotecall_fetch (p) do
70+ Core. eval (mod, expr)
71+ end
6572end
6673
6774macro getfrom (p, obj, mod:: Symbol = :Main )
9299
93100function bcast (pids:: Array , f:: Function , expr, mod:: Module = Main; args... )
94101 @sync for p in pids
95- @async sendto (p, f, expr, mod; args... )
102+ sendto (p, f, expr, mod; args... )
96103 end
97104end
98105
99106function bcast (pids:: Array , f:: Function , mod:: Module = Main; args... )
100107 @sync for p in pids
101- @async sendto (p, f, mod; args... )
108+ sendto (p, f, mod; args... )
102109 end
103110end
104111
@@ -166,13 +173,22 @@ function allreduce(f::Function, pids::Array, src_expr, target_expr = src_expr, m
166173end
167174
168175# Commonly used functions
169- sum (pids:: Array , expr:: Union{Symbol, Expr} , mod:: Module = Main) = sum (gather (pids, expr, mod))
170- allsum (pids:: Array , src_expr:: Union{Symbol, Expr} , target_expr = src_expr, mod:: Module = Main) = bcast (pids, target_expr, sum (pids, src_expr, mod), mod)
176+ function _allsum (pids:: Array , expr:: Union{Symbol, Expr} , mod:: Module = Main)
177+ return Base. sum (gather (pids, expr, mod))
178+ end
171179
172- maximum (pids:: Array , expr, mod:: Module = Main) = maximum (gather (pids, expr, mod))
173- allmaximum (pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast (pids, target_expr, maximum (pids, src_expr, mod), mod)
180+ allsum (pids:: Array , src_expr:: Union{Symbol, Expr} , target_expr = src_expr, mod:: Module = Main) = bcast (pids, target_expr, _allsum (pids, src_expr, mod), mod)
181+
182+ function _allmaximum (pids:: Array , expr, mod:: Module = Main)
183+ return Base. maximum (gather (pids, expr, mod))
184+ end
185+
186+ allmaximum (pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast (pids, target_expr, _allmaximum (pids, src_expr, mod), mod)
187+
188+ function _allminimum (pids:: Array , expr, mod:: Module = Main)
189+ return Base. minimum (gather (pids, expr, mod))
190+ end
174191
175- minimum (pids:: Array , expr, mod:: Module = Main) = minimum (gather (pids, expr, mod))
176- allminimum (pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast (pids, target_expr, minimum (pids, src_expr, mod), mod)
192+ allminimum (pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast (pids, target_expr, _allminimum (pids, src_expr, mod), mod)
177193
178194end
0 commit comments