Skip to content

Commit f1cfae9

Browse files
committed
Spark version update
1 parent 0277808 commit f1cfae9

File tree

3 files changed

+28
-10
lines changed

3 files changed

+28
-10
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,20 @@
11
# spark-data-pipeline
2+
3+
#### Elasticsearch Setup
4+
i) [Download](https://www.elastic.co/downloads/elasticsearch) the Elasticsearch 6.3.0 or latest version and unzip it.
5+
6+
ii) Run the following command.
7+
8+
$ bin/elasticsearch
9+
10+
11+
12+
#### Getting Started:
13+
14+
Clone and run in local mode:
15+
16+
$ git clone git@github.com:techmonad/spark-data-pipeline.git
17+
$ cd spark-data-pipeline
18+
$ sbt run
19+
20+

build.sbt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ name := "spark-data-pipeline"
22

33
version := "1.0"
44

5-
scalaVersion := "2.11.8"
5+
scalaVersion := "2.11.11"
66

77

88
libraryDependencies ++= Seq(
9-
"org.apache.spark" %% "spark-core" % "2.1.0",
10-
"org.elasticsearch" %% "elasticsearch-spark-20" % "5.6.0",
9+
"org.apache.spark" %% "spark-core" % "2.3.1",
10+
"com.univocity" % "univocity-parsers" % "2.6.4",
11+
"org.elasticsearch" %% "elasticsearch-spark-20" % "6.3.0",
1112
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" artifacts(Artifact("stanford-corenlp", "models"), Artifact("stanford-corenlp")),
1213
"ch.qos.logback" % "logback-classic" % "1.2.3",
13-
"org.json4s" %% "json4s-native" % "3.5.0",
14+
"org.json4s" %% "json4s-native" % "3.5.4",
1415
"org.scalatest" %% "scalatest" % "3.0.1"
1516
)

src/main/scala/com/techmonad/pipeline/DataPipeline.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ object DataPipeline {
3636
}
3737
}
3838

39-
private def applySource(source: Source)(implicit sc: SparkContext) = {
39+
private def applySource(source: Source)(implicit sc: SparkContext): RDD[Record] = {
4040
CSVReader.read(source.path)
4141
}
4242

@@ -54,16 +54,14 @@ object DataPipeline {
5454
transformations match {
5555
case Nil => rdd
5656
case head :: tail =>
57-
applyTransformation(
58-
Transformations.get(head).map { v => rdd.map(v.transform) }.getOrElse(rdd)
59-
, tail)
57+
applyTransformation(Transformations.get(head).map { v => rdd.map(v.transform) }.getOrElse(rdd), tail)
6058
}
6159

62-
private def applySchemaValidation(rdd: RDD[Record], validations: List[String]) = {
60+
private def applySchemaValidation(rdd: RDD[Record], validations: List[String]): RDD[Record] = {
6361
applyValidation(rdd, validations)
6462
}
6563

66-
private def applySink(rdd: RDD[Record], sink: Sink) =
64+
private def applySink(rdd: RDD[Record], sink: Sink): ESPersistenceRDD =
6765
sink.`type` match {
6866
case "ES" => new ESPersistenceRDD(rdd)
6967
}

0 commit comments

Comments
 (0)