-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLPSimplexBatchOp.java
More file actions
90 lines (78 loc) · 3.73 KB
/
LPSimplexBatchOp.java
File metadata and controls
90 lines (78 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.alibaba.alink.devp;
import com.alibaba.alink.common.comqueue.IterativeComQueue;
import com.alibaba.alink.common.comqueue.communication.AllReduce;
import com.alibaba.alink.common.linalg.SparseVector;
import com.alibaba.alink.operator.batch.BatchOperator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import java.util.ArrayList;
public class LPSimplexBatchOp extends BatchOperator<LPSimplexBatchOp> implements LPParams<LPSimplexBatchOp> {
public static final String TABLEAU = "tableau";
public static final String UNBOUNDED = "unbounded";
public static final String COMPLETE = "complete";
public static final String PIVOT_ROW_VALUE = "pivotRowValue"; //Tuple<Double, Integer, Integer, Vector> b/a, row index, task id, value of the pivot
public static final String PIVOT_COL_INDEX = "pivotColIndex";
public static final String PHASE = "phase";
public static final String OBJECTIVE = "objective";
public static final String PSEUDO_OBJECTIVE = "pseudoObjective";
public static final String BASIS = "basis";
public LPSimplexBatchOp(){
super();
}
static DataSet<Row> iterateICQ(DataSet <Row> tableau,
final int maxIter,
DataSet<Row> objective,
DataSet<Row> pseudoObjective,
DataSet<Row> basis){
return new IterativeComQueue()
.initWithPartitionedData(TABLEAU, tableau)
.initWithBroadcastData(OBJECTIVE, objective)
.initWithBroadcastData(PSEUDO_OBJECTIVE, pseudoObjective)
.initWithBroadcastData(BASIS, basis)
.add(new LPSimplexCom())
.add(new AllReduce(PIVOT_ROW_VALUE, null,
new AllReduce.SerializableBiConsumer<double[], double[]>() {
@Override
public void accept(double[] a, double[] b) {
if(a[0]==-1.0 ||
(b[0]>-1.0 && b[1]<a[1]) ||
(b[1]==a[1] && b[0]<a[0]))
{
for (int i = 0; i < a.length; ++i)
a[i] = b[i];
}
}
}))
.setCompareCriterionOfNode0(new LPSimplexIterTermination())
.closeWith(new LPSimplexComplete())
.setMaxIter(maxIter)
.exec();
}
@Override
public LPSimplexBatchOp linkFrom(BatchOperator <?>... inputs) {
BatchOperator<?> in = checkAndGetFirst(inputs);
int n = this.getN();
ArrayList<Double> objective = this.getObjective();
SparseVector lowerBounds = this.getLowerBounds();
Integer[] unbounded = this.getUnbounded();
DataSet<Row> Input = in.getDataSet();
Tuple4<DataSet<Row>, DataSet<Row>, DataSet<Row>, DataSet<Row>> data = null;
try {
data = linprogUtil.addArtificialVar(Input, objective, null, lowerBounds, unbounded);
} catch (Exception e) {
e.printStackTrace();
}
assert data != null;
Input = iterateICQ(data.f0,this.getMaxIter(), data.f1, data.f2, data.f3);
DataSet <Row> iterOutput = Input
.map((MapFunction<Row, Row>) row -> {
return row;
}).returns(new RowTypeInfo(Types.DOUBLE));
this.setOutput(iterOutput, new String[]{"MinObject"});
return this;
}
}