A PDF version can be downloaded at the end of the article.
Advantages of Apache Spark:
- Compatible with Hadoop
- Ease of development
- Multiple language support
- Unified stack: Batch, Streaming, Interactive Analytics
Transformation vs. Action:
- A transformation will return an RDD. Since RDD are immutable, the transformation will return a new RDD.
- An action will return a value.
- The Spark core is a computational engine that is responsible for task scheduling, memory management, fault recovery and interacting with storage systems. The Spark core contains the functionality of Spark. It also contains the APIs that are used to define RDDs and manipulate them.
- Spark SQL can be used for working with structured data. You can
query this data via SQL or HiveQL. Spark SQL supports many types of data sources such as structured Hive tables and complex JSON data.
- Spark streaming enables processing of live streams of data and doing real-time analytics.
- MLlib is a machine learning library that provides multiple types of machine learning algorithms such as classification, regression, clustering.
- GraphX is a library for manipulating graphs and performing graph-parallel computations.
- driver & worker are in the same JVM
- RDD & variable in same memory space
- No central master
- execution started by user
Standalone/Yarn Cluster Mode
- the driver is launched from the worker process inside the cluster
- async,no wait
Standalone/Yarn Client Mode
- the driver is launched in the client process that submitted the job
- sync, need to wait
MESOS replaces Spark Master as cluster Manager and provides two modes:
- Fine-grained mode: each task as a separate MESOS task; useful for sharing; start-up overhead
- coarse mode: launches only one long-running task; no sharing; no start-up overhead
Spark provides Transformation & Action, Transformation is lazily
- Text files with one record per line >
- SequenceFiles >
- Other Hadoop inputFormats >
- A (filename, content) pairs >
|Map||Returns new RDD by applying
|filter||Returns new RDD consisting of elements from source on which function is true|
|groupByKey||Returns dataset (K,iterable) pairs on dataset of (K,V)|
|reduceByKey||Returns dataset (K,V) pairs where value for each key aggregated using the given reduce function|
|flatMap||return a sequence instead of single item|
|distinct||Returns new dataset containing distinct elements of source|
|count()||number of elements in the RDD|
|reduce(func)||Aggregate elements of RDD using
|collect()||Returns all elements of RDD as an array to driver program|
|take(n)||Returns an array with first n elements|
|first()||Returns the first elements|
|takeOrdered(n,[ordering])||Return first n elements of RDD using natural order or custom operator|
ParallelCollectionRDD is an RDD of a collection of elements with
numSlices partitions and optional
locationPrefs => the result of
MapPartitionsRDD is an RDD that applies the provided function
f to every partition of the parent RDD.
By default, it does not preserve partitioning — the last input parameter preservesPartitioning is false. If it is true, it retains the original RDD’s partitioning.
MapPartitionsRDD is the result of the following transformations:
A RDD that cogroups its pair RDD parents. For each key k in parent RDDs, the resulting RDD contains a tuple with the list of values for that key.
Use RDD.cogroup(…) to create one.
HadoopRDD is an RDD that provides core functionality for reading data stored in HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI using the older MapReduce API (org.apache.hadoop.mapred).
HadoopRDD is created as a result of calling the following methods in SparkContext:
textFile(the most often used in examples!)
Partitions are of type
ShuffledRDD is an RDD of (key, value) pairs. It is a shuffle step (the result RDD) for transformations that trigger shuffle at execution. Such transformations ultimately call
coalesce transformation with shuffle input parameter
It can be the result of RDD transformations using Scala implicits:
partitionBy(only when the input partitioner is different from the current one in an RDD)
Commonly used actions for data frame
|collect||returns an array with all of rows in df|
|count||returns number of rows in the df|
|describe(cols)||computes statistics for numeric columns including
|first(), head()||returns first row|
|show()||displays the first 20 rows in tabular form|
|take(n)||returns the first n rows|
Commonly used functions for data frame
|cache()||cache DF same as
|columns||return all column names as an array|
|explain()||prints the physical plan to the console for debugging purposes|
|printSchema()||Print the schema to the console in a tree format|
|registerTempTable(tableName)||Registers this DF as a tempory table using the given name|
|toDF()||returns a new data frame|
Commonly used language integrated [transformation] queries
|agg(expr, exprs)||Aggregates on the entire DF without groups|
|distinct||returns a new unique Dataframe|
|filter(conditionExpr)||filters based on given sql expression|
|groupBy(col1, cols)||groups DF using specified columns|
|select(cols)||selects a set of columns based on expressions|
Default project setting:
sbt installed, run
sbt package in the root of project to compile the
PairRDD: key-value Pairs >
val pairRDD = RDD.map(data => (data(key), data(value)) )
Many formats will directly return pairRDD: sequenceFiles create pairRDD, sc.wholeTextFile on small files creates pairRDD.
- Transformations specific to pairRDD
reduceByKey(func: (V, V) ⇒ V, numPartitions: Int): RDD[(K, V)]: combines first locally then send at most one
Kto shuffle and reduce
groupByKey(): RDD[(K, Iterable[V])]: very expensive use following two instead, returns a key/value pairs where the value is an iterable list
aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]: given initial value
U, first operation with
V, then the result value
Uwill be combined by
combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]: transform
createCombinerto transform each
C, finally for each
sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
- Transformations that work on two pair RDD
join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]: inner join > keep common
cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]: full outer join > keep all key
subtractByKey(): left subtract join > only left without common key
leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
- Transformations that apply to RDD as pairRDD, the input is
(K, V), so use
._1to access the
._2to access the
- Traditional actions on RDD available to PairRDD
countByKey(): Map[K, Long]: To handle very large results, consider using
rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an
RDD[T, Long]instead of a map > a transformation
collectAsMap(): Map[K, V]: collect result as a map
lookup(key: K): Seq[V]: return all values associated with the provided key
- The data within an RDD is split into several partitions.
- System groups elements based on a function of each key: set of keys appear together on the same node
- Each machine in cluster contains one or more partitions: number of partitions is at least as large as number of cores in the cluster
- Two kinds of partitioning: Hash Partitioning, Range Partitioning
Apply the hash partitioner to an RDD using the
partitionBy transformation at the start of a program. It will shuffle all the data with the same key to the same worker.
The range partitioner will shuffle the data for keys in a range to the same workers especially a pair RDD that contains keys with an ordering defined.
partitionBy(partitioner: Partitioner): RDD[(K, V)]:
Specify partitions in transformations
Some operations accept additional argument: numPartitions or type of partitioner. Some operations automatically result in RDD with known partitioner:
To change partitioning outside of aggregations and grouping operations:
repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]shuffles data across network to create new set of partitions
coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]decreases the number of partitions, use
rdd.partition.size()to determine the current number of partitions
SparkSQL is a library that runs on top of the Apache Spark Core and provides DataFrame API. The Spark DataFrames use a relational optimizer called the Catalyst optimizer.
- is a programming abstraction in sparkSQL: a distributed collection of data organized into named columns and scales to PBs
- supports wide array of data formats & storage systems, can be constructed from structured data files, tables in Hive, external databases or existing RDDs > equivalent to a DB table but provides a much finer level of optimization
- has API in scala, python, java and sparkR
- Infer schema by reflection
- convert RDD containing case classes
- use when schema is known and under 22 columns - limitation of Case class
- Construct schema programmatically
- use to construct DF when columns and their types not known until runtime
- when the fields pass 22
Infer schema by reflection is best for the users who are going to see the same fields in the same way.
Infer schema by reflection
In Python, no toDF or Case, just use
SQLContext.createDataFrame(data, schema=None, samplingRatio=None) over RDD with:
|data||an RDD of Row/tuple/list/dict, list, or pandas.DataFrame.|
|schema||a StructType or list of column names. default None.|
|samplingRatio||the sample ratio of rows used for inferring|
Example in python:
Construct schema Programmatically
- Create an RDD of
Rowsfrom the original RDD
- Create the schema represented by a
StructTypematching the structure of Rows in the RDD created in Step 1:
StructType(Array(StructField(name, dataType, nullable),...))
- Apply the schema to the RDD of
createDataFramemethod provided by
- to JDBC:
sqlContext.write.jdbc(url: String, table: String, connectionProperties: Properties): Unit
Language integrated queries
agg(expr, exprs): aggregates on entire DF or
except(other): returns new DF with rows from this DF not in
filter(expr): filter based on the SQL expression or condition
sort($"col1", $"col2".desc): sort the column(s)
- In Spark, UDF can be defined inline, no need for registration
- No complicated registration or packaging process
- 2 types of UDF
- to use with Scala DSL (with Data Frame Operations)
- to use with SQL
User Defined functions in (Scala DSL Domain Specific Language)
In scala use
In python use
User defined functions in SQL query
N.B. The underscore tells scala that we don’t know the parameter yet but want the function as a value. The required parameter will be supplied later.
The partition of DF is set by default in
spark.sql.shuffle.partitions as value 200, and can be change by
sqlContext.setConf(key, value), i.e.
sqlContext.setConf("spark.sql.shuffle.partitions", partitionNumber: Int).
Repartition is possible by
df.repartition(numPartitions: Int), and the number of partitions can be determined by
- each partition to be 50 MB - 200 MB
- small dataset is ok to have few partitions
- large cluster with 100 nodes should have at least 100 partitions, e.g. 100 nodes with 10 slots in each executor require 1000 partitions to use all executor slots.
A RDD Lineage Graph (aka RDD operator graph) is a graph of all the parent RDDs of a RDD, is built as a result of applying transformations to the RDD and creates a logical execution plan:
rdd.toDebugString() will return the lineage for an RDD.
No calculation is performed until an action while the lazy evaluation creates a Directed Acyclic Graph (DAG).
While applying an action, the scheduler outputs a computation stage for each RDD in the DAG. If an RDD can be computed from its parent without movement of Data, they are collapsed into a single stage referred as pipelining.
Situations when lineage truncated as usually the number of jobs equal to the number of RDDs in DAG:
- RDD persisted in cluster memory or disk
- RDD materialized due to earlier shuffle: since shuffle outputs in Spark are written to disk => Built-in optimization
When an action is encountered, the DAG is translated into a physical plan to compute the RDDs needed for performing the action.
|tasks||unit of work within a stage|
|stages||group of tasks|
|shuffle||transfer of data between stages|
|jobs||work required to compute RDD, has one or more stages|
|pipelining||collapsing of RDDs into a single stage when RDD transformations can be computed without data movement|
|DAG||Directed Acyclic Graph: Logical graph of RDD operations|
|RDD||Resilient Distributed Dataset: Parallel dataset with partitions|
Phases during Spark Execution:
- User code defines the DAG
- Actions responsible for translating DAG to physical execution plan
- Tasks scheduled and executed on cluster
- Skewed partition: some partition(s) take more time than others.
- Node Problem: some node(s) have issues
Common issues leading to slow performance:
- the level of parallelism: tune the level of parallelism by
numPartitionsin the shuffle ops or
- the serialization format used during shuffle operations:
- Spark serializes data during data transfer thus shuffle operations
- java built in serializer is default, Kryo serialization is often more efficient
- managing memory to optimize your application:
- 60% RDD storage vs. 20% Shuffle vs. 20% user program
- Refer to persist options
MEMORY_ONLY_SERwill cut down on garbage collection -> serialized in memory cache.
Best Practices and Tips
- avoid shuffling large amounts of data:
foldByKeyinstead of heavy shuffle
- do not copy all elements of RDD to driver: not
- filter sooner than later
- many idle tasks (~10k) ->
- not using all slots in cluster ->
- Data stream divided into batches
- Process data using transformations
- Output operations push data out in batches
Create Streaming Context
N.B.: the second parameter is
batchDuration the time interval at which streaming data divided into batches.
After a context is defined, you have to do the following:
- Define the input sources by creating input DStreams
- Define the streaming computations by applying transformation and output operations to DStreams
- Start receiving data and processing it using
- Wait for the processing to be stopped (manually or due to any error) using
- The processing can be manually stopped using
stopSparkContext=falsenot to stop
- Data sources:
- File based: HDFS
- Network based: TCP sockets
- Twitter, Kafka, Flume, ZeroMQ, Akka Actor
- Transformations: create new DStream > cf.
- Standard RDD operations:
- Stateful operations:
- Standard RDD operations:
- Output operations: trigger computation
saveAsHadoopFiles: save to HDFS
forEachRDD: do anything with each batch of RDDs
- Apply transformations over a sliding window of Data
- window length [duration of the window] vs.[normally > ] sliding interval [interval of operation]
|window(windowLength, slideInterval)||Returns new DStream Computed based on windowed batches of source DStream|
|countByWindow(windowLength, slideInterval)||Returns a sliding window count of elements in the stream|
|reduceByWindow(func, windowLength, slideInterval)||Returns a new single-element stream created by aggregating elements over sliding interval using
|reduceByKeyAndWindow(func, windowLenght, slideInvterval, [numTasks])||Returns a new DStream of (K,V) pairs from DStream of (K,V) pairs; aggregates using given reduce function
|countByValueAndWindow(windowLength, slideInterval, [numTasks])||Returns new DStream of (K,V) paris where value of each key is its frequency within a sliding window; it acts on DStreams of (K,V) pairs|
Fault Tolerance in Spark RDDs
- RDD is immutable
- Each RDD remembers lineage
- if RDD partition is lost due to worker node failure, partition recomputed
- Data in final transformed RDD always the same provided the RDD transformation are deterministic
- Data comes from fault-tolerant systems -> RDDs from fault-tolerant data are also fault tolerant
Fault Tolerance in Spark Streaming
- Spark Streaming launches receivers within an executor for each input source. The receiver receives input data that is saved as RDDs and then replicated to other executors for fault tolerance. The default replication for each input source is 2.
- The data is stored in memory as cached RDDs.
- If instead the node with the receiver fails, then there may be a loss of data that was received by the system but not yet replicated to other nodes. The receiver will be started on a different node.
- Write Ahead Logs (WAL), when enabled, all of the received data is saved to log files in a fault tolerant system, and sends acknowledgment of receipt, to protect against receiver failure.
- Provides fault tolerance for driver
- Periodically saves data to fault tolerant system
- Two types of check pointing:
- Metadata > for recovery from driver failures
- Data checkpointing > for basic functioning if using stateful transformations
- Enable checkpointing >
- If you use sliding windows, you need to enable checkpointing
A way to represent a set of vertices that may be connected by edges.
Apache Spark GraphX:
- Spark component for graphs and graph-parallel computations
- Combines data parallel and graph parallel processing using in single API
- View data as graphs and as collections (RDD) > without the need for duplication or movement of data
- Operations for graph computation
- Provides graph algorithms and builders
Regular Graph: graph where each vertex has the same number of edges, e.g. users on Facebook.
Directed Graph: graph in which edges run in one direction from one vertex to another, e.g. Twitter follower.
Property graph: the primary abstraction of Spark GraphX; a directed multigraph which can have multiple edges in parallel； every edge and vertex has user defined properties associated with it, and therefore is unique > immutable, distributed and fault-tolerant. Characteristics:
- Edges and vertices have user-defined properties associated with them
- Property graphs are directional
- Every edge and vertex is unique
- Property graphs are immutable
Step 1. Import required classes
Step 2. Create vertex RDD
Vertex RDD is a tuple with a vertex ID, followed by an array of the properties of the vertex >
Step 3. Create edge RDD
An edge in a property graph will have the source ID, destination ID and properties >
tuple((srcID,dstID),case(properties)) then construct
Edge(srcID, destID, case(properties)).
Step 4. Create graph
|Info||numEdges||number of edges (Long)|
|Info||numVertices||number of vertices (Long)|
|Info||inDegrees||The in-degree of each vertex (VertexRDD(int))|
|Info||outDegrees||The out-degree of each vertex (VertexRDD(int))|
|Info||Degrees||The degree of each vertex (VertexRDD(int))|
|Structure||vertices||An RDD containing the vertices and associated attributes|
|Structure||edges||An RDD containing the edges and associated attributes|
|Structure||triplets||An RDD containing the edges and associated attributes >
|Structure||subgraph(epred, vpred)||Restricts graph to only vertices and edges satisfying the predicates|
|Structure||reverse||Reverses all edges in the graph|
|Structure||mask||The mask operator constructs a subgraph by returning a graph that contains the vertices and edges that are also found in the input graph|
|Property||mapVertices||Transforms each vertex attribute using map function|
|Property||mapEdges||Transforms each edge attribute, a partition at a time, using map function - does not pass the vertex value to the edge|
|Property||mapTriplets||Transforms each edge attribute, a partition at a time, using map function - does pass the vertex value to the edge|
In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an
RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:
|Algorithms and Utilities||Description|
|Basic statistics||summary statistics, correlations, hypothesis testing, random data generation|
|classification and regression||linear models, decision trees, Naive Bayes|
|Collaborative filtering||model-based collaborative filtering using alternating least squares (ALS) algorithm|
|Clustering||Supoorts K-means clustering|
|Dimensionality reduction||singular value decomposition (SVD) and principal component analysis (PCA)|
|Feature extraction and transformation||common feature transformations|
Alternating Least Squares
- ALS approximates a sparse user item rating matrix of dimension K as the product of two dense matrices (user [UK] and item [IK] factor [latent] matrices).
- ALS tries to learn the hidden features of each user and item
- The algorithm alternatively fixes one factor matrix and solves for the other until it converges.