Apache Spark - crash course


Working with NoSQL databases can be very inconvenient as we lose even the basic tools to get insights on our data. For instance, answering very simple questions like “How many customers bought a specific product?” can be nearly impossible, depending on the data structure we built. Simply put, removing GROUP BY, JOIN, and WHERE operators from a database would render it useless for ad-hoc queries. NoSQL databases have never had these operators in the first place, making them a non trivial choice for ad-hoc data analytics.

Of course if engineering knew up front what kind of queries will hit the system, they could have denormalized the data to the point that these very specific questions can be answered easily, but ad-hoc queries would be still impossible.

Spark to help


Apache Spark is a very interesting concept: it is a distributed data access engine that can work on multiple underlying data stores, providing a consistent API for the developers. For instance, it can run on Hadoop, Cassandra, and MongoDB too; it can even work with some simple shared file systems like NFS. For a full list, check out the Spark data access packages.

Running on some underlying data store it provides both batch and streaming capabilities.

Another unique property of Spark is that it tries to keep the intermediate data in memory for the next computations, unlike Hadoop, which is the traditional batch map-reduce framework: write the results to disk, start the next stage. Spark can spill the data to disk when the memory is full, but it will try not to. The intermediate data caching (.cache) can be fine tuned with the storage level parameters.

Streaming operations


Spark can respond in a time-window fashion to incoming data, utilising all the computing nodes. This is a typical use case for dealing with large volumes of incoming data that doesn’t need to be stored permanently. Good examples would be webpage impressions, or user clicks. However, Spark Streaming can be utilised for very complex operations too, for instance Netflix built their recommendation engine on top of Spark Streaming.

Batch operations


Apache Spark provides two basic ways to interact with it in a batch fashion, as in read some existing data, transform it, and save it or display it: the standard Cassandra Context API, and Hive API (for SQL queries). All of these are available both in a standalone app and on the interactive console as well, so we can either write Scala queries, or even standard SQL queries - JOIN and GROUP BY are back!

Whilst the SQL API is very convenient, it’s important to learn what you can, cannot, or shouldn’t do when running your queries. As you are working on terabytes of data (otherwise why bother with a hard to use NoSQL database in the first place?), some of the queries can exhaust the disk IO, the network, or the CPUs.

A typical batch operation first needs a lot of CPU, bandwidth, and network packets between the nodes, but at the end of the job when the aggregations happen, it will shuffle a lot of smaller packets with much smaller bandwidth.

Spark components


Driver - the node we are interacting with, the one that will ask the cluster to execute the Job and get the results back.
Job - the work that needs to be parallelized and broken down on the cluster to Tasks and executed by the Workers.
Master - it will be elected dynamically and responsible for distributing the work between the workers.
Worker - a node that can run the Spark jobs.
Executor - the actual process on the Worker that will execute the Tasks and cache the intermediate results.
Task - a unit of independent work that an Executor can execute.

Starting the interactive shell


Either run:

SPARK_HOME/bin/spark-shell

Or if you have DataStax Enterprise then:

dse spark

For a more complete configuration on how to run Apache Spark on a Cassandra cluster when SSL is enabled on Cassandra:

dse 
-u cassandraUsername -p cassandraPassword 
spark 
--conf spark.cassandra.connection.ssl.enabled=true
--conf spark.cassandra.connection.ssl.trustStore.password=javaTrustStorePassword
--conf spark.cassandra.connection.ssl.trustStore.path=/etc/dse/cassandra/.truststore 
--total-executor-cores=144
--executor-memory 10g

In the example above we are asking for a total of 144 CPU cores to be allocated to the job across the cluster, and each executor should allocate exactly 10 gigabytes of memory up front. On a cluster with 18 nodes it will preallocate 8 cores and 10g on each box for this specific job. If I start another shell with the same parameters, it will allocate another set of 8 cores and another 10g on each node!

The default Spark shell uses Scala, so brushing up on Scala will be helpful.

Tracking your jobs


There are two important URLs to check to see how the jobs are running.
The master UI is available on
http://localhost:7080/
It will display resource allocation and currently running tasks.

To track in details what’s being run on the cluster, have a look at
http://localhost:4040/jobs

Also, the Apache Spark progress bar looks a bit unusual, so let’s have a quick look at it:

[Stage 0:===================================>      (403 + 136) / 601]

Stage shows which stage of the batch job is being run. Some batches will require only a single stage, some will need to keep processing the data further, for instance filter, group, then sort. The latter will have more distinct stages and the cluster must finish a stage completely before moving to another one.

On the progress bar above the cluster has completed 403 tasks out of 601 total tasks, and 136 cores are actively running a task (out of the 144 that we preallocated previously).

Running Spark Context - Scala crash course


The following examples can be executed on the freely available DataStax Spark virtual box and most of them are self explanatory so I haven’t really added more description. The jobs below are mostly operating on RDDs (Resilient Distributed Dataset) and only the results of the transformations are collected back to the driver as an array for instance.

It’s important to understand that transformations and actions are very different: the transformation is a method that will be executed later (filter(), map(), etc.), when an action (collect(), take(), etc.) is performed on the RDD. Until then, it’s just a definition, so for instance the following first line doesn't actually open the Cassandra table; it will only happen on the second line when the processing pipeline is triggered by the .take() action.

val movies = sc.cassandraTable("killr_video", "videos_by_actor")

movies.select("actor_name").as( (name:String) => (name, 1)).
  reduceByKey(_ + _).map( e => (e._2, e._1)).sortByKey(false).take(10)

/////////

val moviesTH = movies.select("video_id", "release_year", "title").
  as( (id:String, r:Integer, t:String) => (id, t + " " + r)).
  where("actor_name = 'Benicio del Toro'")

val moviesJD = movies.select("video_id", "release_year", "title").
  as( (id:String, r:Integer, t:String) => (id, t + " " + r)).
  where("actor_name = 'Johnny Depp'")

moviesTH.join(moviesJD).mapValues{ case (title1, title2) => title1}.
  collect.foreach(println)

/////////

val moviesTH = movies.select("video_id", "release_year", "title").
  as( (id:String, r:Integer, t:String) => (id, t + " " + r)).
  where("actor_name = 'Benicio del Toro'")

val moviesJD = movies.select("video_id", "release_year", "title").
  as( (id:String, r:Integer, t:String) => (id, t + " " + r)).
  where("actor_name = 'Johnny Depp'")

moviesJD.intersection(moviesTH).collect.foreach(println)

/////////

val movieRatings = sc.cassandraTable("killr_video", "videos").
  select("video_id", "avg_rating").
  as( (id:String, r:Option[Float]) => (id, r))

val moviesJD = sc.cassandraTable("killr_video", "videos_by_actor").
  where("actor_name = 'Johnny Depp'").
  select("video_id", "title").
  as( (id:String, t:String) => (id, t))

val ratingsJD = moviesJD.leftOuterJoin(movieRatings).
  map{ case (id, (title, rating)) => (rating.get.get, title) }

ratingsJD.sortByKey(false).collect.foreach(println)

## CREATE TABLE killr_video.johnny_ratings(title text PRIMARY KEY, rating float);

ratingsJD.saveToCassandra("killr_video", "johnny_ratings", SomeColumns("rating", "title"))

/////////

val bestvideos = sc.cassandraTable("killr_video", "videos").
  select("video_id", "title", "avg_rating").
  as( (id:java.util.UUID, t:String, r:Option[Float]) => (id,(t, r.getOrElse(0.0f).asInstanceOf[Float])) ).
  sortBy( t => t._2._2, false).take(10)

val actors = sc.cassandraTable("killr_video", "videos_by_actor").
  select("video_id", "actor_name").
  as( (id:java.util.UUID, t:String) => (id, t))

val bestvideosrdd = sc.parallelize(bestvideos)

bestvideosrdd.join(actors).collect.foreach(println)

/////////

val movieRatings = sc.cassandraTable("killr_video", "videos").
  keyBy(row => row.getUUID("video_id"))

val moviesJD = sc.cassandraTable("killr_video", "videos_by_actor").
  where("actor_name = 'Johnny Depp'").keyBy(row => row.getUUID("video_id"))

val ratingsJD = moviesJD.leftOuterJoin(movieRatings).
  map{ case (id, (rowJD, rowM)) => (rowM.get.getFloat("avg_rating"),rowM.get.getString("title"))}

ratingsJD.sortByKey(false).collect.foreach(println)

/////////

import scala.util.parsing.json._

val obj = JSON.parseFull("{\"a\": 11}")

obj.get.asInstanceOf[Map[String, Any]]("a")

/////////

val movies = sc.cassandraTable("killr_video", "videos").
  keyBy( row => row.getInt("release_year") ).
  partitionBy(new org.apache.spark.HashPartitioner(2 * sc.defaultParallelism)).cache

movies.partitioner

movies.partitions.size

movies.countByKey.foreach(println)

movies.groupByKey.filterByKey( k => k == 1942).collect.foreach(println)

/////////

sc.cassandraTable("killr_video", "videos_by_actor").
  where("actor_name = 'Johnny Depp'").cassandraCount

/////////

sc.cassandraTable("killr_video", "videos_by_actor").
  where("actor_name = 'Johnny Depp'").count

/////////

sc.cassandraTable("killr_video", "videos_by_actor").
  select("title", "actor_name").
  as( (t:String, n:String) => (n, t)).spanByKey.take(10)

sc.cassandraTable("killr_video", "videos_by_actor").
  select("title", "actor_name").
  as( (t:String, n:String) => (n, t)).groupByKey.take(10)

/////////

case class ActorYear(actor_name: String, release_year: Integer)

val actors2014 = 
  sc.parallelize(List(ActorYear("Johnny Depp", 2014), ActorYear("Bruce Willis", 2014)))

actors2014.joinWithCassandraTable("killr_video", "videos_by_actor").
  on(SomeColumns("actor_name", "release_year")).collect.foreach(println)

/////////

case class Actor(actor_name: String)

val actors = 
  sc.parallelize(List(Actor("Johnny Depp"), Actor("Bruce Willis"))).
  repartitionByCassandraReplica("killr_video", "videos_by_actor")

actors.joinWithCassandraTable("killr_video", "videos_by_actor").takeSample(false, 10)

/////////

import java.io._

val er = sc.cassandraTable("killr_video","videos_by_actor")

val writer = new PrintWriter(new BufferedWriter(new FileWriter("/tmp/piglet2.txt")))

er.select("actor_name", "release_year").
  as( (ac:String, y:Int) => Array(ac, y).mkString(",")).
  collect.
  foreach(writer.println)

writer.flush

writer.close


Working with Cassandra SQL / DataFrames


The SparkContext interface and Scala is fun to work with, but in some cases it’s much more convenient to write a simple SQL query to achieve the same result (note: sqlContext was called csc in the previous versions of Spark interactive shell). When we are using the Hive API we will mostly work with DataFrames as opposed to the RDDs in the sample above. DFs are more SQL like data structures than the Scala / JVM oriented RDDs, so it's easier to display and filter them.


sqlContext.sql("SELECT * FROM killr_video.videos_by_actor").take(10)

/////////

sqlContext.sql("SELECT * FROM killr_video.videos_by_actor").show

/////////

sqlContext.sql("SELECT count(*) FROM killr_video.videos_by_actor where actor_name = 'Johnny Depp'").show

/////////

sqlContext.setKeyspace("killr_video")

sqlContext.sql("SELECT count(*) FROM videos_by_actor where actor_name = 'Johnny Depp'").registerTempTable("jd_movies")

sqlContext.sql("SELECT * FROM jd_movies").show

/////////

sqlContext.sql("SELECT release_year, count(*) FROM killr_video.videos_by_actor group by release_year order by release_year desc").limit(5).show

/////////

sqlContext.sql("SELECT release_year, count(*) FROM killr_video.videos_by_actor group by release_year order by release_year desc").limit(5).explain

/////////

import sqlContext.implicits._

case class ActorYear(actor_name: String, release_year: Integer)

val actors2014df = sc.parallelize(List(ActorYear("Johnny Depp", 2014), ActorYear("Bruce Willis", 2014))).toDF()

val actors2014df = sc.parallelize(List(ActorYear("Johnny Depp", 2014), ActorYear("Bruce Willis", 2014))).toDF("name", "year")

Actors2014df.show

actors2014df.filter("actor_name = 'Bruce Willis'").show

actors2014df.printSchema

actors2014df.schema

actors2014df.dtypes

actors2014df.first.getInt(0)

actors2014df.first.get(1).asInstanceOf[Integer]

Further reading


Apache Spark - all methods with samples 
Apache Spark Tuning - part 1
Apache Spark Tuning - part 2

Comments

Popular posts from this blog

MurMurHash3, an ultra fast hash algorithm for C# / .NET

ESP32 - send a push notification from the Arduino ESP32 device to your phone

Octoprint as a systemd service - running it with high priority