diff --git a/core/src/main/java/io/ddf/content/ViewHandler.java b/core/src/main/java/io/ddf/content/ViewHandler.java index 204e2d0a..288c0728 100644 --- a/core/src/main/java/io/ddf/content/ViewHandler.java +++ b/core/src/main/java/io/ddf/content/ViewHandler.java @@ -269,7 +269,7 @@ public String toString() { @Override public String toSql() { if (name == null) { - throw new IllegalArgumentException("Missing Operator name from Adatao client for operands[] " + throw new IllegalArgumentException("Missing Operator name from DDF client for operands[] " + Arrays.toString(operands)); } switch (name) { diff --git a/core/src/main/java/io/ddf/etl/ATimeSeriesHandler.java b/core/src/main/java/io/ddf/etl/ATimeSeriesHandler.java new file mode 100644 index 00000000..9bbb6842 --- /dev/null +++ b/core/src/main/java/io/ddf/etl/ATimeSeriesHandler.java @@ -0,0 +1,127 @@ +package io.ddf.etl; + + +import io.ddf.DDF; +import io.ddf.analytics.ABinningHandler.BinningType; +import io.ddf.exception.DDFException; +import io.ddf.misc.ADDFFunctionalGroupHandler; +import java.util.List; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; + +public abstract class ATimeSeriesHandler extends ADDFFunctionalGroupHandler implements IHandleTimeSeries { + + protected String mTimestampColumn; + protected String mTsIDColumn = null; + + + public ATimeSeriesHandler(DDF theDDF) { + super(theDDF); + + } + + public void setTimeStampColumn(String colName) { + mTimestampColumn = colName; + } + + public String getTimeStampColumn() { + return mTimestampColumn; + } + + + public String getTsIDColumn() { + return mTsIDColumn; + } + + public void setTsIDColumn(String colName) { + this.mTsIDColumn = colName; + } + + @Override + public DDF downsample(String timestampColumn, List aggregateFunctions, int interval, TimeUnit timeUnit) + throws DDFException { + + this.mTimestampColumn = timestampColumn; + List groupByCols = Lists.newArrayList(timestampColumn); + if (mTsIDColumn != null && !mTsIDColumn.isEmpty()) { + groupByCols.add(mTsIDColumn); + } + + long intervalInSeconds = timeUnit.toSeconds(interval); + + int numBins = getNumBins(intervalInSeconds); + DDF binnedDDF = this.getDDF().binning(timestampColumn, BinningType.EQUALINTERVAL.toString(), numBins, null, false, + true, true); + DDF newDDF = binnedDDF.groupBy(groupByCols, aggregateFunctions); + + return newDDF; + } + + @Override + public DDF downsample(String timestampColumn, String tsIDColumn, List aggregateFunctions, int interval, + TimeUnit timeUnit) throws DDFException { + + this.mTsIDColumn = tsIDColumn; + List rs = getDistinctValues(tsIDColumn); + + DDF ddf0 = filterByValue(tsIDColumn, rs.get(0)); + + ddf0.getTimeSeriesHandler().setTsIDColumn(tsIDColumn); + DDF newDDF = ddf0.getTimeSeriesHandler().downsample(timestampColumn, aggregateFunctions, interval, timeUnit); + if (rs.size() > 1) { + for (int i = 1; i < rs.size(); i++) { + DDF filteredDDF = filterByValue(tsIDColumn, rs.get(i)); + filteredDDF.getTimeSeriesHandler().setTsIDColumn(tsIDColumn); + DDF nextDDF = filteredDDF.getTimeSeriesHandler().downsample(timestampColumn, aggregateFunctions, interval, + timeUnit); + newDDF = newDDF.getJoinsHandler().merge(nextDDF); + } + } + return newDDF; + } + + @Override + public DDF addDiffColumn(String timestampColumn, String colToGetDiff, String diffColumn) throws DDFException{ + return addDiffColumn(timestampColumn, null, colToGetDiff, diffColumn); + } + + @Override + public DDF addDiffColumn(String timestampColumn, String tsIDColumn, String colToGetDiff, String diffColumn) + throws DDFException { + // TODO Auto-generated method stub + return null; + } + + @Override + public DDF computeMovingAverage(String timestampColumn, String tsIDColumn, String colToComputeMovingAverage, + String movingAverageColName, int windowSize) throws DDFException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void saveTimeSeriesToCSV(String pathToStorage) { + // TODO Auto-generated method stub + + } + + private int getNumBins(long intervalInSeconds) throws DDFException { + long minTimeStamp = this.getDDF().getVectorMin(mTimestampColumn).longValue(); + long maxTimeStamp = this.getDDF().getVectorMax(mTimestampColumn).longValue(); + int numBins = (int) ((maxTimeStamp - minTimeStamp) / intervalInSeconds); + return numBins; + + } + + private List getDistinctValues(String colName) throws DDFException { + String sqlCmd = String.format("SELECT distinct(%s) FROM %s", colName, this.getDDF().getTableName()); + List rs = this.getManager().sql(sqlCmd, this.getEngine()).getRows(); + return rs; + } + + private DDF filterByValue(String colName, String value) throws DDFException { + String sqlCmd = String.format("SELECT * FROM %s WHERE %s = '%s'", this.getDDF().getTableName(), colName, value); + DDF filteredDDF = this.getDDF().getSqlHandler().sql2ddf(sqlCmd); + return filteredDDF; + } +} diff --git a/core/src/main/java/io/ddf/etl/IHandleTimeSeries.java b/core/src/main/java/io/ddf/etl/IHandleTimeSeries.java new file mode 100644 index 00000000..813d7bbd --- /dev/null +++ b/core/src/main/java/io/ddf/etl/IHandleTimeSeries.java @@ -0,0 +1,29 @@ +package io.ddf.etl; + +import java.util.List; +import io.ddf.DDF; +import io.ddf.exception.DDFException; +import io.ddf.misc.IHandleDDFFunctionalGroup; +import java.util.concurrent.TimeUnit; + +public interface IHandleTimeSeries extends IHandleDDFFunctionalGroup { + + void setTimeStampColumn(String colName); + + void setTsIDColumn(String colName); + + String getTimeStampColumn() throws DDFException; + + DDF downsample(String timestampColumn, List aggregateFunctions, int interval, TimeUnit timeUnit) throws DDFException; + + DDF downsample(String timestampColumn, String tsIDColumn, List aggregateFunctions, int interval, TimeUnit timeUnit) throws DDFException; + + DDF addDiffColumn(String timestampColumn, String colToGetDiff, String diffColName) throws DDFException; + + DDF addDiffColumn(String timestampColumn, String tsIDColumn, String colToGetDiff, String diffColName) throws DDFException; + + DDF computeMovingAverage(String timestampColumn, String tsIDColumn, String colToComputeMovingAverage, String movingAverageColName, + int windowSize) throws DDFException; + + void saveTimeSeriesToCSV(String path); +} diff --git a/core/src/main/java/io/ddf/misc/IHandleTimeSeries.java b/core/src/main/java/io/ddf/misc/IHandleTimeSeries.java deleted file mode 100644 index f2addc0a..00000000 --- a/core/src/main/java/io/ddf/misc/IHandleTimeSeries.java +++ /dev/null @@ -1,6 +0,0 @@ -package io.ddf.misc; - - -public interface IHandleTimeSeries extends IHandleDDFFunctionalGroup { - -} diff --git a/ddf-conf/ddf.ini b/ddf-conf/ddf.ini index bce0e287..af556165 100755 --- a/ddf-conf/ddf.ini +++ b/ddf-conf/ddf.ini @@ -19,6 +19,7 @@ ISupportMLMetrics = io.ddf.spark.ml.MLMetricsSupporter IHandleTransformations = io.ddf.etl.TransformationHandler IHandleMutability = io.ddf.content.MutabilityHandler IHandleMissingData = io.ddf.etl.MissingDataHandler +;IHandleTimeSeries = io.ddf.etl.TimeSeriesHandler ;IHandleSql = io.ddf.etl.SqlHandler ;IRunAlgorithms = io.ddf.analytics.AlgorithmRunner MAX_LEVELS_COUNT = 10000 @@ -45,6 +46,8 @@ ISupportMLMetrics = io.ddf.spark.ml.MLMetricsSupporter IHandleBinning = io.ddf.spark.analytics.BinningHandler IHandleMutability = io.ddf.content.MutabilityHandler IHandleMissingData = io.ddf.etl.MissingDataHandler +IHandleTimeSeries = io.ddf.spark.etl.TimeSeriesHandler +IHandlePersistence = io.ddf.spark.content.PersistenceHandler kmeans = org.apache.spark.mllib.clustering.KMeans linearRegressionLasso = org.apache.spark.mllib.regression.LassoWithSGD linearRegressionWithSGD = org.apache.spark.mllib.regression.LinearRegressionWithSGD diff --git a/resources/test/quandl_stocks.csv b/resources/test/quandl_stocks.csv new file mode 100644 index 00000000..66bc6d08 --- /dev/null +++ b/resources/test/quandl_stocks.csv @@ -0,0 +1,300 @@ +AAL,2015-11-04,46.419998,46.450001,45.150002,45.630001,8081400.0,45.505839 +AAL,2015-11-05,45.700001,45.830002,45.029999,45.599998,5864700.0,45.475918 +AAL,2015-11-06,45.509997999999996,45.970001,45.049999,45.34,11669600.0,45.216627 +AAL,2015-11-09,45.169998,45.209998999999996,44.560001,44.650002,7214600.0,44.528506 +AAL,2015-11-10,44.66,45.16,44.02,44.490002000000004,5935300.0,44.368941 +AAL,2015-11-11,44.759997999999996,44.950001,44.369999,44.389998999999996,4413000.0,44.269211 +AAL,2015-11-12,44.290001000000004,44.52,43.82,43.900002,5951100.0,43.780546 +AAL,2015-11-13,43.669998,44.23,43.119999,43.450001,7486100.0,43.33177 +AAL,2015-11-16,42.41,42.84,41.849998,42.830002,10201100.0,42.713458 +AAL,2015-11-17,43.0,43.459998999999996,42.049999,42.299999,9596500.0,42.184898 +AAL,2015-11-18,42.32,42.32,41.009997999999996,41.75,11071100.0,41.636395 +AAL,2015-11-19,41.939999,42.73,41.889998999999996,42.299999,6545500.0,42.184898 +AAL,2015-11-20,42.400002,42.709998999999996,42.049999,42.200001,5656000.0,42.085172 +AAL,2015-11-23,42.110001000000004,42.740002000000004,41.810001,42.299999,4166100.0,42.184898 +AAL,2015-11-24,41.02,41.639998999999996,40.450001,41.23,7578000.0,41.11781 +AAL,2015-11-25,41.349998,41.5,40.959998999999996,41.310001,4276600.0,41.197594 +AAL,2015-11-27,41.380001,42.25,41.169998,41.689999,3066600.0,41.576557 +AAL,2015-11-30,41.75,41.990002000000004,40.98,41.259997999999996,6450900.0,41.147727 +AAL,2015-12-01,41.470001,43.299999,41.299999,43.259997999999996,8912900.0,43.142285 +AAL,2015-12-02,44.09,44.529999,43.529999,43.82,8010400.0,43.700762 +AAL,2015-12-03,43.650002,44.599998,42.919998,43.27,7235300.0,43.15226 +AAL,2015-12-04,43.82,44.990002000000004,43.82,44.970001,9460900.0,44.847635 +AAL,2015-12-07,45.349998,46.490002000000004,45.220001,45.779999,8911700.0,45.655428 +AAL,2015-12-08,45.099998,45.509997999999996,44.07,44.549999,9043700.0,44.428776 +AAL,2015-12-09,44.299999,45.189999,43.189999,43.650002,8001000.0,43.531227 +AAL,2015-12-10,43.790001000000004,44.48,43.540001000000004,44.09,7955600.0,43.970028000000006 +AAL,2015-12-11,43.279999,43.59,42.060001,42.09,10235200.0,41.97547 +AAL,2015-12-14,42.259997999999996,42.869999,41.080002,41.77,10750500.0,41.656341 +AAL,2015-12-15,41.970001,42.82,41.919998,42.599998,7068900.0,42.484081 +AAL,2015-12-16,42.990002000000004,43.490002000000004,42.119999,43.200001,7850900.0,43.08245 +AAON,2015-11-04,22.74,24.58,22.74,24.48,303100.0,24.372931 +AAON,2015-11-05,24.42,24.42,23.83,24.4,211700.0,24.293281 +AAON,2015-11-06,24.280001000000002,24.5,23.4,24.23,240700.0,24.124024 +AAON,2015-11-09,24.209999,24.559998999999998,23.959999,24.35,176900.0,24.2435 +AAON,2015-11-10,24.219998999999998,24.559998999999998,23.360001,24.440001000000002,175400.0,24.333107000000002 +AAON,2015-11-11,24.440001000000002,25.030001000000002,24.24,24.639999,254600.0,24.532231 +AAON,2015-11-12,24.530001000000002,24.559998999999998,23.639999,24.030001000000002,226500.0,23.9249 +AAON,2015-11-13,23.99,24.25,23.41,23.48,223000.0,23.377304000000002 +AAON,2015-11-16,23.389999,23.709999,23.389999,23.5,229700.0,23.397217 +AAON,2015-11-17,23.6,24.049999,23.379998999999998,23.639999,159900.0,23.536604 +AAON,2015-11-18,23.77,24.200001,23.620001000000002,24.110001,188400.0,24.00455 +AAON,2015-11-19,24.120001000000002,24.280001000000002,23.799999,23.959999,145400.0,23.855204999999998 +AAON,2015-11-20,24.059998999999998,24.389999,23.799999,24.129998999999998,167300.0,24.024461 +AAON,2015-11-23,24.030001000000002,24.719998999999998,24.030001000000002,24.459999,149000.0,24.353018 +AAON,2015-11-24,24.280001000000002,24.860001,24.280001000000002,24.77,137300.0,24.661663 +AAON,2015-11-25,24.74,25.200001,24.6,24.98,146200.0,24.870744000000002 +AAON,2015-11-27,24.940001000000002,25.34,24.76,25.15,102100.0,25.04 +AAON,2015-11-30,25.120001000000002,25.129998999999998,24.639999,24.709999,142900.0,24.709999 +AAON,2015-12-01,24.83,24.9,24.23,24.309998999999998,186600.0,24.309998999999998 +AAON,2015-12-02,24.27,24.52,24.049999,24.1,122200.0,24.1 +AAON,2015-12-03,24.209999,24.26,23.610001,23.74,184700.0,23.74 +AAON,2015-12-04,23.75,24.610001,23.700001,24.530001000000002,164700.0,24.530001000000002 +AAON,2015-12-07,24.459999,24.459999,23.84,24.059998999999998,129500.0,24.059998999999998 +AAON,2015-12-08,23.860001,24.58,23.620001000000002,24.34,274600.0,24.34 +AAON,2015-12-09,24.27,24.73,23.389999,23.42,206900.0,23.42 +AAON,2015-12-10,23.440001000000002,23.530001000000002,23.02,23.33,363500.0,23.33 +AAON,2015-12-11,22.860001,23.5,22.629998999999998,22.74,346900.0,22.74 +AAON,2015-12-14,22.639999,23.540001,22.639999,23.51,316200.0,23.51 +AAON,2015-12-15,23.66,23.76,22.85,23.379998999999998,169600.0,23.379998999999998 +AAON,2015-12-16,23.5,23.790001,23.16,23.389999,131600.0,23.389999 +AAPL,2015-11-04,123.129997,123.82,121.620003,122.0,44886100.0,120.824373 +AAPL,2015-11-05,121.849998,122.690002,120.18,120.91999799999999,39552700.0,120.267398 +AAPL,2015-11-06,121.110001,121.809998,120.620003,121.059998,33042300.0,120.406641 +AAPL,2015-11-09,120.959999,121.809998,120.050003,120.57,33871400.0,119.919288 +AAPL,2015-11-10,116.900002,118.07,116.059998,116.769997,59127900.0,116.139793 +AAPL,2015-11-11,116.370003,117.41999799999999,115.209999,116.110001,45218000.0,115.48335900000001 +AAPL,2015-11-12,116.260002,116.82,115.650002,115.720001,32525600.0,115.09546499999999 +AAPL,2015-11-13,115.199997,115.57,112.269997,112.339996,45812400.0,111.73370200000001 +AAPL,2015-11-16,111.379997,114.239998,111.0,114.18,38106700.0,113.563775 +AAPL,2015-11-17,114.91999799999999,115.050003,113.32,113.690002,27616900.0,113.076422 +AAPL,2015-11-18,115.760002,117.489998,115.5,117.290001,46674700.0,116.65699099999999 +AAPL,2015-11-19,117.639999,119.75,116.760002,118.779999,43295800.0,118.138948 +AAPL,2015-11-20,119.199997,119.91999799999999,118.849998,119.300003,34287100.0,118.656145 +AAPL,2015-11-23,119.269997,119.730003,117.339996,117.75,32482500.0,117.114508 +AAPL,2015-11-24,117.33000200000001,119.349998,117.120003,118.879997,42803200.0,118.23840600000001 +AAPL,2015-11-25,119.209999,119.230003,117.91999799999999,118.029999,21388300.0,117.392995 +AAPL,2015-11-27,118.290001,118.410004,117.599998,117.809998,13046400.0,117.174181 +AAPL,2015-11-30,117.989998,119.410004,117.75,118.300003,39180300.0,117.661542 +AAPL,2015-12-01,118.75,118.809998,116.860001,117.339996,34852400.0,116.706717 +AAPL,2015-12-02,117.339996,118.110001,116.08000200000001,116.279999,33386600.0,115.65243999999998 +AAPL,2015-12-03,116.550003,116.790001,114.220001,115.199997,41569500.0,114.578267 +AAPL,2015-12-04,115.290001,119.25,115.110001,119.029999,57777000.0,118.387598 +AAPL,2015-12-07,118.980003,119.860001,117.809998,118.279999,32084200.0,117.641646 +AAPL,2015-12-08,117.519997,118.599998,116.860001,118.230003,34309500.0,117.591921 +AAPL,2015-12-09,117.639999,117.690002,115.08000200000001,115.620003,46361400.0,114.99600600000001 +AAPL,2015-12-10,116.040001,116.940002,115.510002,116.16999799999999,29212700.0,115.54303300000001 +AAPL,2015-12-11,115.190002,115.389999,112.849998,113.18,46886200.0,112.569172 +AAPL,2015-12-14,112.18,112.68,109.790001,112.480003,64318700.0,111.87295300000001 +AAPL,2015-12-15,111.940002,112.800003,110.349998,110.489998,52978100.0,109.89368799999998 +AAPL,2015-12-16,111.07,111.989998,108.800003,111.339996,56238500.0,110.739099 +AAWW,2015-11-04,42.459998999999996,42.970001,41.060001,42.580002,693700.0,42.580002 +AAWW,2015-11-05,42.68,43.869999,38.689999,39.41,924400.0,39.41 +AAWW,2015-11-06,38.860001000000004,41.57,38.389998999999996,41.32,290900.0,41.32 +AAWW,2015-11-09,41.23,41.23,39.32,39.689999,275000.0,39.689999 +AAWW,2015-11-10,39.669998,40.919998,39.669998,40.759997999999996,205800.0,40.759997999999996 +AAWW,2015-11-11,40.990002000000004,40.990002000000004,40.060001,40.73,188400.0,40.73 +AAWW,2015-11-12,40.119999,40.889998999999996,39.380001,39.52,269100.0,39.52 +AAWW,2015-11-13,39.349998,39.939999,38.950001,39.330002,139300.0,39.330002 +AAWW,2015-11-16,39.099998,39.970001,38.450001,39.869999,175700.0,39.869999 +AAWW,2015-11-17,40.09,41.0,39.75,40.27,211900.0,40.27 +AAWW,2015-11-18,40.560001,40.849998,39.810001,40.599998,140500.0,40.599998 +AAWW,2015-11-19,40.689999,41.099998,39.98,40.880001,129800.0,40.880001 +AAWW,2015-11-20,41.080002,41.299999,40.459998999999996,40.610001000000004,129900.0,40.610001000000004 +AAWW,2015-11-23,40.290001000000004,41.389998999999996,40.02,40.759997999999996,154100.0,40.759997999999996 +AAWW,2015-11-24,40.459998999999996,41.290001000000004,39.91,41.080002,117100.0,41.080002 +AAWW,2015-11-25,41.040001000000004,41.040001000000004,40.43,40.77,125700.0,40.77 +AAWW,2015-11-27,40.919998,41.650002,40.900002,40.970001,92800.0,40.970001 +AAWW,2015-11-30,41.16,41.650002,40.529999,41.32,210800.0,41.32 +AAWW,2015-12-01,41.369999,41.970001,40.810001,41.919998,129600.0,41.919998 +AAWW,2015-12-02,41.82,41.82,40.610001000000004,40.759997999999996,164400.0,40.759997999999996 +AAWW,2015-12-03,40.77,41.139998999999996,40.18,40.689999,159500.0,40.689999 +AAWW,2015-12-04,40.709998999999996,41.59,40.259997999999996,41.27,231700.0,41.27 +AAWW,2015-12-07,41.18,41.98,39.299999,39.380001,214300.0,39.380001 +AAWW,2015-12-08,38.77,39.279999,38.02,38.369999,160600.0,38.369999 +AAWW,2015-12-09,38.099998,39.68,38.099998,39.380001,311600.0,39.380001 +AAWW,2015-12-10,39.400002,40.529999,39.369999,40.27,178000.0,40.27 +AAWW,2015-12-11,39.52,40.34,38.259997999999996,38.57,222900.0,38.57 +AAWW,2015-12-14,38.540001000000004,39.299999,37.849998,38.919998,308600.0,38.919998 +AAWW,2015-12-15,39.16,39.450001,38.700001,39.139998999999996,249800.0,39.139998999999996 +AAWW,2015-12-16,39.310001,40.43,38.959998999999996,40.279999,258200.0,40.279999 +ABAX,2015-11-04,51.529999,52.529999,51.459998999999996,52.09,155300.0,51.83644 +ABAX,2015-11-05,52.0,53.049999,51.639998999999996,52.869999,142700.0,52.612642 +ABAX,2015-11-06,52.59,53.07,51.650002,52.91,124600.0,52.652448 +ABAX,2015-11-09,52.799999,52.799999,51.490002000000004,52.25,72500.0,51.995661 +ABAX,2015-11-10,52.080002,52.66,51.779999,52.619999,57000.0,52.363859000000005 +ABAX,2015-11-11,53.0,54.98,53.0,53.240002000000004,310600.0,52.980844 +ABAX,2015-11-12,53.25,53.75,51.099998,51.52,257900.0,51.269215 +ABAX,2015-11-13,51.220001,53.98,50.389998999999996,51.029999,131000.0,50.781597999999995 +ABAX,2015-11-16,51.119999,51.950001,50.900002,51.759997999999996,147700.0,51.508044 +ABAX,2015-11-17,51.939999,53.240002000000004,51.450001,52.419998,140800.0,52.164832000000004 +ABAX,2015-11-18,52.52,52.52,51.66,52.220001,178400.0,51.965808 +ABAX,2015-11-19,52.009997999999996,52.459998999999996,51.02,51.16,95900.0,50.910967 +ABAX,2015-11-20,51.540001000000004,52.060001,51.279999,51.689999,111300.0,51.438386 +ABAX,2015-11-23,51.93,55.150002,51.93,54.919998,243400.0,54.652662 +ABAX,2015-11-24,54.57,54.889998999999996,53.200001,53.5,206300.0,53.239576 +ABAX,2015-11-25,53.5,53.759997999999996,52.98,53.48,61700.0,53.219673 +ABAX,2015-11-27,53.48,54.09,53.150002,53.380001,39100.0,53.120162 +ABAX,2015-11-30,53.549999,53.759997999999996,52.880001,53.169998,94900.0,52.911181000000006 +ABAX,2015-12-01,53.330002,53.93,52.43,53.09,122600.0,52.941099 +ABAX,2015-12-02,53.130001,54.16,52.720001,53.77,126600.0,53.619192000000005 +ABAX,2015-12-03,54.029999,54.830002,52.259997999999996,53.900002,192300.0,53.748828 +ABAX,2015-12-04,53.919998,54.5,53.540001000000004,54.290001000000004,73400.0,54.137733999999995 +ABAX,2015-12-07,54.150002,54.509997999999996,53.860001000000004,54.200001,88400.0,54.047986 +ABAX,2015-12-08,53.66,54.68,53.5,54.220001,52400.0,54.06793100000001 +ABAX,2015-12-09,54.099998,54.349998,52.709998999999996,53.049999,66500.0,52.90121 +ABAX,2015-12-10,52.970001,54.209998999999996,52.970001,53.59,72500.0,53.439696 +ABAX,2015-12-11,52.810001,52.860001000000004,51.450001,51.700001,121000.0,51.554998 +ABAX,2015-12-14,51.990002000000004,55.130001,51.040001000000004,53.009997999999996,233800.0,52.861321 +ABAX,2015-12-15,53.290001000000004,53.669998,51.959998999999996,52.200001,160400.0,52.053596 +ABAX,2015-12-16,52.41,53.860001000000004,52.41,53.540001000000004,74200.0,53.389837 +ABCB,2015-11-04,31.379998999999998,31.59,31.209999,31.549999,202300.0,31.504156 +ABCB,2015-11-05,31.540001,32.240002000000004,31.4,32.189999,189100.0,32.143225 +ABCB,2015-11-06,32.130001,33.490002000000004,32.130001,33.490002000000004,310500.0,33.441339 +ABCB,2015-11-09,33.419998,33.580002,33.130001,33.459998999999996,206200.0,33.41138 +ABCB,2015-11-10,33.459998999999996,33.990002000000004,33.400002,33.66,140200.0,33.611090000000004 +ABCB,2015-11-11,33.810001,33.970001,33.150002,33.259997999999996,95500.0,33.21167 +ABCB,2015-11-12,33.040001000000004,33.349998,32.68,32.900002,135900.0,32.852196 +ABCB,2015-11-13,32.790001000000004,33.049999,32.0,32.240002000000004,145700.0,32.193156 +ABCB,2015-11-16,32.099998,32.57,31.33,32.400002,200200.0,32.352923 +ABCB,2015-11-17,32.5,33.259997999999996,32.450001,32.900002,230800.0,32.852196 +ABCB,2015-11-18,32.959998999999996,33.099998,32.470001,33.080002,158700.0,33.031935 +ABCB,2015-11-19,33.099998,33.650002,32.919998,33.580002,166000.0,33.531209000000004 +ABCB,2015-11-20,33.630001,33.970001,33.200001,33.790001000000004,327000.0,33.740903 +ABCB,2015-11-23,33.950001,34.220001,33.91,34.110001000000004,207700.0,34.060437 +ABCB,2015-11-24,33.860001000000004,34.240002000000004,33.509997999999996,33.610001000000004,158100.0,33.561164 +ABCB,2015-11-25,33.599998,33.82,33.5,33.77,141000.0,33.720931 +ABCB,2015-11-27,33.740002000000004,34.419998,33.630001,34.380001,82200.0,34.330045 +ABCB,2015-11-30,34.52,34.639998999999996,34.040001000000004,34.189999,202400.0,34.140319 +ABCB,2015-12-01,34.400002,34.59,34.16,34.459998999999996,258000.0,34.409927 +ABCB,2015-12-02,34.5,34.759997999999996,34.150002,34.200001,266500.0,34.150307 +ABCB,2015-12-03,34.5,34.619999,33.959998999999996,34.07,158200.0,34.020495000000004 +ABCB,2015-12-04,34.119999,34.93,34.040001000000004,34.880001,161200.0,34.829319 +ABCB,2015-12-07,34.900002,34.990002000000004,34.549999,34.84,235900.0,34.789376000000004 +ABCB,2015-12-08,34.650002,35.209998999999996,34.650002,34.900002,216700.0,34.84929 +ABCB,2015-12-09,34.75,34.919998,34.049999,34.290001000000004,290000.0,34.240176 +ABCB,2015-12-10,34.209998999999996,34.299999,33.400002,33.490002000000004,248100.0,33.441339 +ABCB,2015-12-11,33.0,33.610001000000004,32.48,32.849998,286600.0,32.802265999999996 +ABCB,2015-12-14,32.82,33.799999,32.630001,33.560001,302500.0,33.511237 +ABCB,2015-12-15,33.889998999999996,34.709998999999996,32.599998,34.419998,189700.0,34.369984 +ABCB,2015-12-16,34.740002000000004,34.740002000000004,33.77,34.389998999999996,379300.0,34.340029 +ABCO,2015-11-04,43.540001000000004,44.419998,43.16,44.099998,254100.0,44.099998 +ABCO,2015-11-05,44.110001000000004,45.349998,43.669998,44.209998999999996,202900.0,44.209998999999996 +ABCO,2015-11-06,50.029999,53.740002000000004,47.330002,53.330002,857100.0,53.330002 +ABCO,2015-11-09,53.09,53.740002000000004,50.709998999999996,52.470001,315500.0,52.470001 +ABCO,2015-11-10,51.689999,53.029999,51.439999,52.869999,388100.0,52.869999 +ABCO,2015-11-11,53.310001,53.509997999999996,52.02,53.200001,219400.0,53.200001 +ABCO,2015-11-12,52.970001,53.439999,52.48,53.009997999999996,250800.0,53.009997999999996 +ABCO,2015-11-13,52.669998,52.919998,51.919998,52.639998999999996,209600.0,52.639998999999996 +ABCO,2015-11-16,52.439999,52.630001,51.91,52.25,165900.0,52.25 +ABCO,2015-11-17,53.279999,53.619999,51.970001,52.32,322500.0,52.32 +ABCO,2015-11-18,52.240002000000004,53.25,51.790001000000004,52.25,169500.0,52.25 +ABCO,2015-11-19,52.02,52.32,51.450001,51.720001,209500.0,51.720001 +ABCO,2015-11-20,51.990002000000004,51.990002000000004,50.82,51.040001000000004,290400.0,51.040001000000004 +ABCO,2015-11-23,50.919998,52.23,50.349998,51.619999,232700.0,51.619999 +ABCO,2015-11-24,51.23,52.299999,50.959998999999996,52.189999,266900.0,52.189999 +ABCO,2015-11-25,52.25,53.07,52.209998999999996,52.73,212200.0,52.73 +ABCO,2015-11-27,52.610001000000004,53.470001,52.27,52.82,49000.0,52.82 +ABCO,2015-11-30,53.0,53.91,52.75,53.84,199700.0,53.84 +ABCO,2015-12-01,53.959998999999996,54.59,53.700001,54.200001,325000.0,54.200001 +ABCO,2015-12-02,54.080002,54.77,52.68,54.209998999999996,244400.0,54.209998999999996 +ABCO,2015-12-03,54.25,54.84,51.459998999999996,52.290001000000004,268900.0,52.290001000000004 +ABCO,2015-12-04,52.240002000000004,53.130001,51.98,52.990002000000004,181900.0,52.990002000000004 +ABCO,2015-12-07,53.029999,53.029999,50.900002,51.43,229100.0,51.43 +ABCO,2015-12-08,51.25,52.650002,50.970001,52.279999,181700.0,52.279999 +ABCO,2015-12-09,52.169998,52.52,50.990002000000004,51.16,191400.0,51.16 +ABCO,2015-12-10,51.290001000000004,52.139998999999996,50.619999,51.549999,322300.0,51.549999 +ABCO,2015-12-11,50.68,51.41,50.369999,50.68,205200.0,50.68 +ABCO,2015-12-14,50.75,50.93,48.720001,49.529999,252600.0,49.529999 +ABCO,2015-12-15,49.720001,51.310001,49.0,50.959998999999996,257000.0,50.959998999999996 +ABCO,2015-12-16,51.07,51.32,49.470001,49.91,364000.0,49.91 +ACAD,2015-11-04,42.25,43.299999,41.599998,42.290001000000004,1751000.0,42.290001000000004 +ACAD,2015-11-05,42.07,42.41,40.330002,41.540001000000004,1410600.0,41.540001000000004 +ACAD,2015-11-06,41.150002,41.73,34.970001,36.75,4266000.0,36.75 +ACAD,2015-11-09,36.540001000000004,36.799999,35.02,35.209998999999996,2223700.0,35.209998999999996 +ACAD,2015-11-10,35.0,36.18,34.82,35.970001,1256800.0,35.970001 +ACAD,2015-11-11,36.220001,36.889998999999996,35.610001000000004,35.66,886200.0,35.66 +ACAD,2015-11-12,35.220001,36.380001,34.869999,34.970001,858100.0,34.970001 +ACAD,2015-11-13,34.880001,35.959998999999996,34.509997999999996,35.049999,1110000.0,35.049999 +ACAD,2015-11-16,34.790001000000004,35.470001,33.700001,34.889998999999996,997400.0,34.889998999999996 +ACAD,2015-11-17,35.09,35.400002,34.25,34.57,1122800.0,34.57 +ACAD,2015-11-18,34.639998999999996,36.330002,34.330002,36.310001,1190400.0,36.310001 +ACAD,2015-11-19,36.389998999999996,36.98,35.59,35.869999,832500.0,35.869999 +ACAD,2015-11-20,36.049999,36.759997999999996,35.77,36.0,708000.0,36.0 +ACAD,2015-11-23,35.779999,37.560001,35.779999,37.07,891400.0,37.07 +ACAD,2015-11-24,36.91,37.93,36.509997999999996,37.619999,968800.0,37.619999 +ACAD,2015-11-25,37.509997999999996,39.150002,37.509997999999996,38.279999,1144900.0,38.279999 +ACAD,2015-11-27,38.509997999999996,39.360001000000004,38.330002,39.200001,577300.0,39.200001 +ACAD,2015-11-30,39.25,39.41,37.77,37.950001,952700.0,37.950001 +ACAD,2015-12-01,38.299999,38.299999,36.299999,37.509997999999996,798000.0,37.509997999999996 +ACAD,2015-12-02,37.5,38.549999,37.209998999999996,37.48,949000.0,37.48 +ACAD,2015-12-03,37.639998999999996,37.639998999999996,34.84,35.240002000000004,1045400.0,35.240002000000004 +ACAD,2015-12-04,35.290001000000004,36.279999,34.57,36.150002,934300.0,36.150002 +ACAD,2015-12-07,36.0,36.060001,34.279999,34.43,943500.0,34.43 +ACAD,2015-12-08,34.029999,35.48,33.360001000000004,35.279999,784800.0,35.279999 +ACAD,2015-12-09,35.02,35.27,34.110001000000004,34.290001000000004,667200.0,34.290001000000004 +ACAD,2015-12-10,34.279999,35.049999,33.990002000000004,34.650002,635900.0,34.650002 +ACAD,2015-12-11,33.919998,34.799999,32.380001,32.5,959500.0,32.5 +ACAD,2015-12-14,32.459998999999996,33.970001,32.330002,33.48,1277000.0,33.48 +ACAD,2015-12-15,34.82,37.900002,34.240002000000004,34.599998,2290600.0,34.599998 +ACAD,2015-12-16,35.040001000000004,35.98,33.990002000000004,35.970001,1252900.0,35.970001 +ACAS,2015-11-04,12.88,13.11,12.82,12.82,2292300.0,12.82 +ACAS,2015-11-05,12.89,13.73,12.76,13.71,9241400.0,13.71 +ACAS,2015-11-06,13.9,14.12,13.79,14.06,5562700.0,14.06 +ACAS,2015-11-09,13.93,14.24,13.93,14.06,5196300.0,14.06 +ACAS,2015-11-10,14.03,14.27,14.03,14.23,5864600.0,14.23 +ACAS,2015-11-11,14.25,14.45,14.21,14.33,5450200.0,14.33 +ACAS,2015-11-12,14.25,14.53,14.19,14.4,4006700.0,14.4 +ACAS,2015-11-13,14.38,14.49,14.19,14.31,6318800.0,14.31 +ACAS,2015-11-16,14.26,14.79,14.24,14.71,4496900.0,14.71 +ACAS,2015-11-17,14.71,14.74,14.43,14.44,4653300.0,14.44 +ACAS,2015-11-18,14.54,14.56,14.35,14.39,2383400.0,14.39 +ACAS,2015-11-19,14.34,14.49,14.29,14.37,1312600.0,14.37 +ACAS,2015-11-20,14.41,14.46,14.27,14.33,1612800.0,14.33 +ACAS,2015-11-23,14.33,14.4,14.06,14.09,2853800.0,14.09 +ACAS,2015-11-24,14.05,14.21,13.75,13.9,2209600.0,13.9 +ACAS,2015-11-25,14.86,15.24,14.61,15.22,11269200.0,15.22 +ACAS,2015-11-27,15.26,15.67,15.25,15.54,4442000.0,15.54 +ACAS,2015-11-30,15.67,15.87,15.47,15.65,5146800.0,15.65 +ACAS,2015-12-01,15.68,15.74,15.53,15.55,2685000.0,15.55 +ACAS,2015-12-02,15.5,15.51,15.18,15.24,2754500.0,15.24 +ACAS,2015-12-03,15.33,15.45,15.13,15.15,3295200.0,15.15 +ACAS,2015-12-04,15.17,15.32,15.14,15.24,2696500.0,15.24 +ACAS,2015-12-07,15.19,15.26,14.69,14.72,5925800.0,14.72 +ACAS,2015-12-08,14.52,14.67,14.06,14.16,5549200.0,14.16 +ACAS,2015-12-09,14.13,14.16,13.59,13.82,8159700.0,13.82 +ACAS,2015-12-10,13.8,14.45,13.64,14.17,7842800.0,14.17 +ACAS,2015-12-11,14.0,14.08,13.64,13.67,4464100.0,13.67 +ACAS,2015-12-14,13.69,13.8,13.03,13.24,6392800.0,13.24 +ACAS,2015-12-15,13.29,14.04,13.29,13.98,7284200.0,13.98 +ACAS,2015-12-16,13.84,14.25,13.82,14.03,3553800.0,14.03 +ACAT,2015-11-04,22.059998999999998,22.440001000000002,21.719998999999998,21.85,197500.0,21.722312 +ACAT,2015-11-05,21.83,21.879998999999998,21.209999,21.52,94800.0,21.39424 +ACAT,2015-11-06,21.35,22.059998999999998,21.27,21.950001,104800.0,21.821728 +ACAT,2015-11-09,21.799999,22.33,21.25,21.59,126500.0,21.463831 +ACAT,2015-11-10,21.530001000000002,21.629998999999998,20.969998999999998,21.24,159500.0,21.115876 +ACAT,2015-11-11,21.27,21.629998999999998,20.32,20.35,134700.0,20.231077 +ACAT,2015-11-12,20.25,20.99,19.9,20.16,157100.0,20.042187 +ACAT,2015-11-13,20.08,20.18,19.49,19.629998999999998,127500.0,19.515284 +ACAT,2015-11-16,19.6,20.67,19.42,20.299999,179400.0,20.181369 +ACAT,2015-11-17,19.6,20.549999,19.6,19.77,111300.0,19.654467 +ACAT,2015-11-18,19.73,20.540001,19.34,20.49,134000.0,20.370259 +ACAT,2015-11-19,20.5,21.110001,20.08,21.049999,150400.0,20.926986 +ACAT,2015-11-20,21.23,21.700001,21.030001000000002,21.58,169800.0,21.453889 +ACAT,2015-11-23,21.59,22.139999,21.290001,21.389999,142100.0,21.264999 +ACAT,2015-11-24,21.190001000000002,21.74,20.959999,21.549999,71600.0,21.549999 +ACAT,2015-11-25,21.5,21.700001,20.84,21.6,137200.0,21.6 +ACAT,2015-11-27,21.540001,21.9,21.49,21.780001000000002,56700.0,21.780001000000002 +ACAT,2015-11-30,21.969998999999998,22.4,21.620001000000002,22.26,174100.0,22.26 +ACAT,2015-12-01,22.42,22.530001000000002,22.07,22.18,159900.0,22.18 +ACAT,2015-12-02,22.120001000000002,22.370001000000002,20.66,21.889999,198000.0,21.889999 +ACAT,2015-12-03,21.959999,22.52,20.93,20.950001,129700.0,20.950001 +ACAT,2015-12-04,20.780001000000002,21.040001,20.34,20.5,139000.0,20.5 +ACAT,2015-12-07,20.370001000000002,20.76,19.799999,20.139999,128900.0,20.139999 +ACAT,2015-12-08,20.049999,20.67,20.0,20.290001,183300.0,20.290001 +ACAT,2015-12-09,20.379998999999998,20.65,19.16,19.43,158700.0,19.43 +ACAT,2015-12-10,19.389999,20.059998999999998,19.389999,19.58,195700.0,19.58 +ACAT,2015-12-11,19.18,19.4,18.200001,18.23,193300.0,18.23 +ACAT,2015-12-14,18.139999,18.52,17.360001,17.389999,184700.0,17.389999 +ACAT,2015-12-15,17.35,17.719998999999998,16.74,17.26,268000.0,17.26 +ACAT,2015-12-16,17.360001,17.66,17.200001,17.280001000000002,185100.0,17.280001000000002 diff --git a/resources/test/quandl_stocks_small.csv b/resources/test/quandl_stocks_small.csv new file mode 100644 index 00000000..bff7a52f --- /dev/null +++ b/resources/test/quandl_stocks_small.csv @@ -0,0 +1,31 @@ +AAL,2015-11-04,46.419998,46.450001,45.150002,45.630001,8081400.0,45.505839 +AAL,2015-11-05,45.700001,45.830002,45.029999,45.599998,5864700.0,45.475918 +AAL,2015-11-06,45.509997999999996,45.970001,45.049999,45.34,11669600.0,45.216627 +AAL,2015-11-09,45.169998,45.209998999999996,44.560001,44.650002,7214600.0,44.528506 +AAON,2015-11-04,22.74,24.58,22.74,24.48,303100.0,24.372931 +AAON,2015-11-05,24.42,24.42,23.83,24.4,211700.0,24.293281 +AAON,2015-11-06,24.280001000000002,24.5,23.4,24.23,240700.0,24.124024 +AAON,2015-11-09,24.209999,24.559998999999998,23.959999,24.35,176900.0,24.2435 +AAPL,2015-11-04,123.129997,123.82,121.620003,122.0,44886100.0,120.824373 +AAPL,2015-11-05,121.849998,122.690002,120.18,120.91999799999999,39552700.0,120.267398 +AAPL,2015-11-06,121.110001,121.809998,120.620003,121.059998,33042300.0,120.406641 +AAPL,2015-11-09,120.959999,121.809998,120.050003,120.57,33871400.0,119.919288 +AAWW,2015-11-04,42.459998999999996,42.970001,41.060001,42.580002,693700.0,42.580002 +AAWW,2015-11-05,42.68,43.869999,38.689999,39.41,924400.0,39.41 +AAWW,2015-11-06,38.860001000000004,41.57,38.389998999999996,41.32,290900.0,41.32 +AAWW,2015-11-09,41.23,41.23,39.32,39.689999,275000.0,39.689999 +ABAX,2015-11-04,51.529999,52.529999,51.459998999999996,52.09,155300.0,51.83644 +ABAX,2015-11-05,52.0,53.049999,51.639998999999996,52.869999,142700.0,52.612642 +ABAX,2015-11-06,52.59,53.07,51.650002,52.91,124600.0,52.652448 +ABAX,2015-11-09,52.799999,52.799999,51.490002000000004,52.25,72500.0,51.995661 +ACAD,2015-11-04,42.25,43.299999,41.599998,42.290001000000004,1751000.0,42.290001000000004 +ACAD,2015-11-05,42.07,42.41,40.330002,41.540001000000004,1410600.0,41.540001000000004 +ACAD,2015-11-06,41.150002,41.73,34.970001,36.75,4266000.0,36.75 +ACAD,2015-11-09,36.540001000000004,36.799999,35.02,35.209998999999996,2223700.0,35.209998999999996 +ACAS,2015-11-04,12.88,13.11,12.82,12.82,2292300.0,12.82 +ACAS,2015-11-05,12.89,13.73,12.76,13.71,9241400.0,13.71 +ACAS,2015-11-06,13.9,14.12,13.79,14.06,5562700.0,14.06 +ACAT,2015-11-04,22.059998999999998,22.440001000000002,21.719998999999998,21.85,197500.0,21.722312 +ACAT,2015-11-05,21.83,21.879998999999998,21.209999,21.52,94800.0,21.39424 +ACAT,2015-11-06,21.35,22.059998999999998,21.27,21.950001,104800.0,21.821728 +ACAT,2015-11-09,21.799999,22.33,21.25,21.59,126500.0,21.463831 diff --git a/spark/src/main/scala/io/ddf/spark/content/PersistenceHandler.scala b/spark/src/main/scala/io/ddf/spark/content/PersistenceHandler.scala index d22be0c9..2197f944 100644 --- a/spark/src/main/scala/io/ddf/spark/content/PersistenceHandler.scala +++ b/spark/src/main/scala/io/ddf/spark/content/PersistenceHandler.scala @@ -7,11 +7,13 @@ import io.ddf.content.Schema import io.ddf.content.APersistenceHandler.PersistenceUri import io.ddf.util.Utils import io.ddf.util.Utils.JsonSerDes -import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.{DataFrame, Row, SchemaRDD} import io.ddf.spark.SparkDDFManager import io.ddf.content.IHandlePersistence.IPersistible + import scala.collection.JavaConversions._ import org.apache.hadoop.fs.Path +import org.apache.spark.rdd.RDD /** * author: daoduchuan @@ -57,6 +59,13 @@ class PersistenceHandler(ddf: DDF) extends BPersistenceHandler(ddf) { s"$directory/$name" } } + + def exportToJson(path: String) = { + val df = this.ddf.getRepresentationHandler.get(classOf[DataFrame]).asInstanceOf[DataFrame] + Utils.locateOrCreateDirectory(path) + df.write.json(s"$path/data") + } + override def load(namespace: String, name: String): IPersistible = { val schemaPath = this.getFolderPath(namespace, name, "schema") val dataPath = this.getFolderPath(namespace, name, "data") diff --git a/spark/src/main/scala/io/ddf/spark/etl/TimeSeriesHandler.scala b/spark/src/main/scala/io/ddf/spark/etl/TimeSeriesHandler.scala new file mode 100644 index 00000000..04fb6ea8 --- /dev/null +++ b/spark/src/main/scala/io/ddf/spark/etl/TimeSeriesHandler.scala @@ -0,0 +1,135 @@ +package io.ddf.spark.etl +import io.ddf.etl.ATimeSeriesHandler +import io.ddf.DDF +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Column, DataFrame, Row} +import io.ddf.spark.{SparkDDF, SparkDDFManager} +import org.apache.spark.rdd.RDD + +import scala.collection.mutable +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types +import org.apache.spark.mllib.linalg.VectorUDT + +class TimeSeriesHandler(ddf: DDF) extends ATimeSeriesHandler(ddf) { + + override def addDiffColumn(timestampColumn: String, + tsIdColumn: String, + colToGetDiff: String, + diffColName: String): DDF = { + val sparkdf = ddf.getRepresentationHandler.get(classOf[DataFrame]).asInstanceOf[DataFrame] + + this.setTsIDColumn(tsIdColumn) + + val wSpec = if (mTsIDColumn != null && !mTsIDColumn.isEmpty) { + Window.partitionBy(tsIdColumn).orderBy(timestampColumn) + } else { + Window.orderBy(timestampColumn) + } + + val prev = lag(colToGetDiff, 1).over(wSpec) + + val newdf = sparkdf.withColumn(diffColName, sparkdf(colToGetDiff) - prev) + + val manager = ddf.getManager.asInstanceOf[SparkDDFManager] + val res = manager.newDDFFromSparkDataFrame(newdf) + res + } + + override def computeMovingAverage(timestampColumn: String, + tsIdColumn: String, + colToComputeMovingAverage: String, + movingAverageColName: String, windowSize: Int): DDF = { + + val sparkdf = ddf.getRepresentationHandler.get(classOf[DataFrame]).asInstanceOf[DataFrame] + + val halfWindowSize = Math.floor(windowSize / 2).toInt + + this.setTsIDColumn(tsIdColumn) + + val wSpec = if (mTsIDColumn != null && !mTsIDColumn.isEmpty) { + Window.partitionBy(tsIdColumn).orderBy(timestampColumn).rowsBetween(halfWindowSize - windowSize + 1, halfWindowSize) + } else { + Window.orderBy(timestampColumn).rowsBetween(halfWindowSize - windowSize + 1, halfWindowSize) + } + + val newdf = sparkdf.withColumn(movingAverageColName, avg(sparkdf(colToComputeMovingAverage)).over(wSpec)) + + val manager = ddf.getManager.asInstanceOf[SparkDDFManager] + val res = manager.newDDFFromSparkDataFrame(newdf) + res + } + + /** + * + * @param groupByColumns: columns to group by the data + * @param sortByColumn: column to sort the time series data by + * @param featureColumns: feature columns of the time series data + * @example + * stockSymbol | timestamp | feature_1 | feature_2 | feature_3 + * A | 1 | 100 | 100.1 | 10 + * A | 2 | 99 | 95 | 10.5 + * B | 1 | 60 | 61 | 100 + * B | 2 | 63 | 64 | 105 + * flatten(Array(stockSymbol), timestamp, Array(feature_1, feature_2, feature_3)) will return an RDD with the + * following structure + * stockSymbol | feature_size | features + * A | 3 | Array(100, 100.1, 10, 99, 95, 10.5) + * B | 3 | Array(60, 61, 100, 63, 64, 105) + * @return + */ + def flatten(groupByColumns: Array[String], sortByColumn: String, featureColumns: Array[String]): DDF = { + val dataFrame = ddf.asInstanceOf[SparkDDF].getRepresentationHandler.get(classOf[DataFrame]).asInstanceOf[DataFrame] + + val columns: Array[Column] = ((groupByColumns :+ sortByColumn) ++ featureColumns).map{ + col => dataFrame.col(col) + } + + val projectedDF = dataFrame.select(columns: _*) + + val sortByColumnIndex = projectedDF.schema.fieldIndex(sortByColumn) + + val groupByColumnIndexes = groupByColumns.map { + col => projectedDF.schema.fieldIndex(col) + } + val featureColumnIndexes = featureColumns.map{ + col => projectedDF.schema.fieldIndex(col) + } + + val featureSize = featureColumns.size + val rddRow = projectedDF.rdd + + val groupedRDD: RDD[(String, Iterable[Row])] = rddRow.groupBy( + row => + groupByColumnIndexes.map( + col => row.getString(col)).mkString(", ") + ) + + val resultRDD = groupedRDD.map { + case (key, values) => + val sortedValues = values.toArray.sortBy(row => row.getLong(sortByColumnIndex)) + val bigArray = mutable.ArrayBuilder.make[Double] + var i = 0 + while(i < sortedValues.size) { + val row = sortedValues(i) + var j = 0 + while(j < featureSize) { + val d = row.getDouble(featureColumnIndexes(j)) + bigArray += d + j += 1 + } + i += 1 + } + Row(key, featureSize, Vectors.dense(bigArray.result())) + } + val keyColumn = new StructField("key", types.StringType) + val featureSizeColumn = new StructField("featureSize", types.IntegerType) + val featuresColumn = new StructField("features", new VectorUDT) + val schema = StructType(Array(keyColumn, featureSizeColumn, featuresColumn)) + val sparkDDFManager = this.getManager.asInstanceOf[SparkDDFManager] + val df = sparkDDFManager.getHiveContext.createDataFrame(resultRDD, schema) + sparkDDFManager.newDDFFromSparkDataFrame(df) + } +} diff --git a/spark/src/test/java/io/ddf/spark/BaseTest.java b/spark/src/test/java/io/ddf/spark/BaseTest.java index bdecec0a..32a48a70 100644 --- a/spark/src/test/java/io/ddf/spark/BaseTest.java +++ b/spark/src/test/java/io/ddf/spark/BaseTest.java @@ -126,6 +126,22 @@ public void createTableCarOwner() throws DDFException { + ") ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '", "SparkSQL"); manager.sql("load data local inpath '../resources/test/carowner' into table carowner", "SparkSQL"); - } + } + + public static void createTableStocks() throws DDFException { + + manager.sql("drop table if exists stocks", "SparkSQL"); + manager.sql("create table stocks (Symbol string, Date string, Open double, High double, Low double, Close double, Volume double, AdjustedClose double)" + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ','", "SparkSQL"); + + manager.sql("load data local inpath '../resources/test/quandl_stocks.csv' into table stocks", "SparkSQL"); + } + public void createTableStocksSmall() throws DDFException { + manager.sql("drop table if exists stocks_small", "SparkSQL"); + manager.sql("create table stocks_small (Symbol string, Date string, Open double, High double, Low double, Close double, Volume double, AdjustedClose double)" + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ','", "SparkSQL"); + + manager.sql("load data local inpath '../resources/test/quandl_stocks_small.csv' into table stocks_small", "SparkSQL"); + } } diff --git a/spark/src/test/java/io/ddf/spark/etl/TimeSeriesHandlerTest.java b/spark/src/test/java/io/ddf/spark/etl/TimeSeriesHandlerTest.java new file mode 100644 index 00000000..d54d7fdd --- /dev/null +++ b/spark/src/test/java/io/ddf/spark/etl/TimeSeriesHandlerTest.java @@ -0,0 +1,164 @@ +package io.ddf.spark.etl; + + +import com.google.common.base.Strings; +import io.ddf.DDF; +import io.ddf.exception.DDFException; +import io.ddf.spark.BaseTest; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import io.ddf.spark.SparkDDFManager; +import io.ddf.spark.content.PersistenceHandler; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.junit.Assert; +import org.junit.Test; +import org.junit.Ignore; +import org.junit.BeforeClass; +import com.google.common.collect.Lists; +import scala.Tuple2; + + +public class TimeSeriesHandlerTest extends BaseTest { + public static DDF stocks; + public static DDF aal_stocks; + + + @BeforeClass + public static void setupTestData() throws DDFException { + + createTableStocks(); + stocks = manager.sql2ddf("select unix_timestamp(concat(Date,' 00:00:00')) as unixts, * from stocks", "SparkSQL"); + aal_stocks = manager.sql2ddf( + "select unix_timestamp(concat(Date,' 00:00:00')) as unixts, * from stocks where Symbol='AAL'", "SparkSQL"); + } + + @Ignore + public void testDownSampling() throws DDFException { + + Assert.assertTrue(aal_stocks.getNumRows() == 30); + List aggFuncs = Lists.newArrayList("close_avg = avg(Close)"); + aggFuncs.add("volume_max = max(Volume)"); + DDF downsampled_aal = aal_stocks.getTimeSeriesHandler().downsample("unixts", aggFuncs, 5, TimeUnit.DAYS); + DDF downsampled_stocks = stocks.getTimeSeriesHandler().downsample("unixts", "symbol", aggFuncs, 5, TimeUnit.DAYS); + + Assert.assertEquals(3, downsampled_aal.getNumColumns()); + Assert.assertEquals(4, downsampled_stocks.getNumColumns()); + + Assert.assertTrue(downsampled_aal.getNumRows() == 8); + Assert.assertTrue(downsampled_stocks.getNumRows() == 80); + + } + + @Ignore + public void testDiff() throws DDFException { + Assert.assertTrue(stocks.getNumRows() == 300); + + String colToGetDiff = "close"; + String diffColName = "close_diff"; + DDF aalWithDiff = aal_stocks.getTimeSeriesHandler().addDiffColumn("unixts", null, colToGetDiff, diffColName); + Assert.assertEquals(10, aalWithDiff.getNumColumns()); + + DDF ddfWithDiff = stocks.getTimeSeriesHandler().addDiffColumn("unixts", "symbol", colToGetDiff, diffColName); + Assert.assertEquals(10, ddfWithDiff.getNumColumns()); + + // List rs1 = manager.sql(String.format("Select * from %s limit 12", + // ddfWithDiff.getTableName()), "SparkSQL").getRows(); + List col1 = ddfWithDiff.VIEWS.project(colToGetDiff).VIEWS.head(10); + double expected_diff = Double.parseDouble(col1.get(1)) - Double.parseDouble(col1.get(0)); + List diff_col = ddfWithDiff.VIEWS.project(diffColName).VIEWS.head(10); + double diff = Double.parseDouble(diff_col.get(1)); + Assert.assertEquals(expected_diff, diff, 1e-4); + } + + @Ignore + public void testMovingAverage() throws DDFException { + + String colToComputeMovingAverage = "volume"; + String movingAverageColName = "movingAvg"; + int windowSize = 3; + DDF ddfWithMA = stocks.getTimeSeriesHandler().computeMovingAverage("unixts", "symbol", colToComputeMovingAverage, + movingAverageColName, windowSize); + Assert.assertEquals(10, ddfWithMA.getNumColumns()); + + List col1 = ddfWithMA.VIEWS.project(colToComputeMovingAverage).VIEWS.head(10); + List ma_col = ddfWithMA.VIEWS.project(movingAverageColName).VIEWS.head(10); + double expected_ma = (Double.parseDouble(col1.get(0)) + Double.parseDouble(col1.get(1)) + Double.parseDouble(col1 + .get(2))) / windowSize; + double ma = Double.parseDouble(ma_col.get(1)); + + Assert.assertEquals(expected_ma, ma, 1e-4); + } + + @Test + public void testSavingData() throws DDFException { + this.createTableStocksSmall(); + DDF stocks = manager.sql2ddf("select unix_timestamp(concat(Date,' 00:00:00')) as unixts, * from stocks_small", "SparkSQL"); + String[] featureColumns = new String[] {"Open", "High", "Low", "Close", "Volume", "AdjustedClose"}; + String stockSymbolColumn = "Symbol"; + String timeStampColumn = "unixts"; + DDF ddf = ((TimeSeriesHandler) stocks.getTimeSeriesHandler()).flatten(new String[] {stockSymbolColumn}, + timeStampColumn, featureColumns); + RDD rows = (RDD) ddf.getRepresentationHandler().get(RDD.class, Row.class); + Row[] arrayRows = (Row[]) rows.collect(); + + for(Row row: arrayRows) { + Assert.assertEquals(row.getInt(1), 6, 0.0); + if(row.getString(0).equals("AAL")) { + Vector vector = (Vector) row.get(2); + double[] arrDouble = vector.toArray(); + Assert.assertEquals(arrDouble[0], 46.419998, 0.01); + Assert.assertEquals(arrDouble[1], 46.450001, 0.01); + Assert.assertEquals(arrDouble[2], 45.150002, 0.01); + Assert.assertEquals(arrDouble[3], 45.630001, 0.01); + Assert.assertEquals(arrDouble[4], 8081400.0, 0.01); + Assert.assertEquals(arrDouble[5], 45.505839, 0.01); + + Assert.assertEquals(arrDouble[6], 45.700001, 0.01); + Assert.assertEquals(arrDouble[7], 45.830002, 0.01); + Assert.assertEquals(arrDouble[8], 45.029999, 0.01); + Assert.assertEquals(arrDouble[9], 45.599998, 0.01); + Assert.assertEquals(arrDouble[10], 5864700.0, 0.01); + Assert.assertEquals(arrDouble[11], 45.475918, 0.01); + + Assert.assertEquals(arrDouble[12], 45.509997999999996, 0.01); + Assert.assertEquals(arrDouble[13], 45.970001, 0.01); + Assert.assertEquals(arrDouble[14], 45.049999, 0.01); + Assert.assertEquals(arrDouble[15], 45.34, 0.01); + Assert.assertEquals(arrDouble[16], 11669600.0, 0.01); + Assert.assertEquals(arrDouble[17], 45.216627, 0.01); + Assert.assertEquals(arrDouble.length, 24, 0.0); + } + + if(row.getString(0).equals("AAON")) { + Vector vector = (Vector) row.get(2); + double[] arrDouble = vector.toArray(); + Assert.assertEquals(arrDouble[0], 22.74, 0.01); + Assert.assertEquals(arrDouble[1], 24.58, 0.01); + Assert.assertEquals(arrDouble[2], 22.74, 0.01); + Assert.assertEquals(arrDouble[3], 24.48, 0.01); + Assert.assertEquals(arrDouble[4], 303100.0, 0.01); + Assert.assertEquals(arrDouble[5], 24.372931, 0.01); + + Assert.assertEquals(arrDouble[6], 24.42, 0.01); + Assert.assertEquals(arrDouble[7], 24.42, 0.01); + Assert.assertEquals(arrDouble[8], 23.83, 0.01); + Assert.assertEquals(arrDouble[9], 24.4, 0.01); + Assert.assertEquals(arrDouble[10], 211700.0, 0.01); + Assert.assertEquals(arrDouble[11], 24.293281, 0.01); + + Assert.assertEquals(arrDouble[12], 24.28000, 0.01); + Assert.assertEquals(arrDouble[13], 24.5, 0.01); + Assert.assertEquals(arrDouble[14], 23.4, 0.01); + Assert.assertEquals(arrDouble[15], 24.23, 0.01); + Assert.assertEquals(arrDouble[16], 240700.0, 0.01); + Assert.assertEquals(arrDouble[17], 24.124024, 0.01); + Assert.assertEquals(arrDouble.length, 24, 0.0); + } + } + } +}