By looking at the lines 41 and 44 you can comment out lineage or not. Basically, with the lineage on, the program crashes, while without the lineage on, it works fine. I attached the source code, outputs with and without, and sample data. Is it due to sortByKey?
sample-data.txt
AirQuality-WithLineage.txt
AirQuality-WithOutLineage.txt
package EPAairquality
import org.apache.spark.lineage.LineageContext
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import math.{atan2, cos, pow, sin, sqrt}
object AverageQuality {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
var logFile = ""
var local = 0
if (args.length < 2) {
sparkConf.setMaster("local[6]")
sparkConf.setAppName("Weather Analysis").set("spark.executor.memory", "2g")
logFile =
"data-epa/sample-data.txt"
} else {
logFile = args(0)
local = args(1).toInt
}
// get threshold
val lat = 37.773972
var long = -122.43129
var radius = 500
var year_start = 2003
var year_end = 2016
var lineage = true
lineage = true
val ctx = new SparkContext(sparkConf)
val lc = new LineageContext(ctx)
lc.setCaptureLineage(lineage)
// read in text file and split each document into words
val csv = ctx.textFile(logFile)
// comment out to disable lineage
// val csv = lc.textFile(logFile)
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_ (0) != header(0))
val cols = data.map(row => Array[String](row(5), row(6), row(9), row(13)))
val time = cols.filter(year => DateTime.parse(year(2)).getYear().toInt <= year_end).filter(year => DateTime.parse(year(2)).getYear().toInt >= year_start)
val relevant = time.filter(location => {
var a = pow(sin((location(0).toDouble - lat) / 2), 2) + cos(lat) * cos(location(0).toDouble) * pow(sin((location(1).toDouble - long) / 2), 2)
var c = 2 * atan2(sqrt(a), sqrt(1 - a))
var d = 6371e3 / 1609.34 * c
if (d > radius) {
false
} else {
true
}
}
)
var kvpair = relevant.map(row => (DateTime.parse(row(2)).getYear().toInt * 100 + DateTime.parse(row(2)).getMonthOfYear().toInt, row(3).toDouble))
var month = kvpair.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1 / y._2)
var output = month.sortByKey().collect.foreach(line => {
System.out.println("Year: " + "%d".format(line._1 / 100) + ", Month: " + "%02d".format(line._1 % 100) + ", Average Hourly PM10: " + "%02.2f".format(line._2) + ";")
})
ctx.stop()
}
}
By looking at the lines 41 and 44 you can comment out lineage or not. Basically, with the lineage on, the program crashes, while without the lineage on, it works fine. I attached the source code, outputs with and without, and sample data. Is it due to sortByKey?
sample-data.txt
AirQuality-WithLineage.txt
AirQuality-WithOutLineage.txt
package EPAairquality
import org.apache.spark.lineage.LineageContext
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import math.{atan2, cos, pow, sin, sqrt}
object AverageQuality {
def main(args: Array[String]) {
}
}