What Spark did next: A case study in product maturation

When Apache Spark 1.0 was released in mid-2014, it was quickly recognised for its compelling rethink of the conventions of large-scale data processing. Stepping away from the then prevalent MapReduce mindset, Spark introduced the concept of the Resilient Distributed Dataset (RDD); an immutable data multiset distributed over a cluster.

Spark offers transparent partitioning and management of the target dataset across multiple executor nodes through a master node. Data operations are a series of functional transformations applied to each executor’s data. Deferring enaction, right up until materialised data output is required, allows optimisation of the chain of operations before execution.

Spark provides resilience by tracking the application of data operations to the distributed dataset. Node failure is then handled by re-running the sequence on the affected data subset on a fresh node.

Spark 2.0’s release came 24 months later in July 2016. Comparing the milestone releases offers insight into how a successful open-source project remains relevant: by studying the preferences of its users and potential users, and by re-evaluating and reworking its fundamental tenets.

The biggest change lies front and centre. RDDs continue to underpin Apache Spark and code written around them remains unaffected. Their use however, has been de-emphasised and relegated to custom scenarios. Spark textbooks now date by the point at which they introduce RDDs: the older references present the concept up front, while the later ones defer it to later chapters.

The following is a selective list of key observations and feature rollouts from the two years prior to Spark’s current form provides some insight into understanding this fundamental shift.

1. Tuning of RDD operations

Although RDDs simplified operations on large volumes of data, tuning data operations remains a non-trivial process with potential stumbling points. Understanding the interplay of lazily and eagerly executed RDD operations, and the consequential internodal data traffic, is crucial. The Spark UI, accessed through port 4040 of the master node, provides workflow visualisations and quantitative data that are vital for understanding and optimising operations.

In the past, storage and network were the biggest impediments to fast data operations. This informed Spark’s focus at the outset on I/O efficiency, network locality, and in-memory operations and caching.

However, processor speeds have not advanced at the same rate as network and storage over the last few years, making compute efficiency a new priority in data operations optimisation.

3. Project Tungsten (introduced in release 1.4)

The Java Virtual Machine (JVM) provides a robust, performant and proven foundation for Spark’s architecture. Project Tungsten, which spanned several releases, was a response to those specific JVM features that hindered Spark performance.

Tungsten introduced binary processing, which circumvents the Java object model by using a custom binary memory layout. This significantly reduces garbage collection pressure for data-intensive workloads.

Tungsten also included a new code generation framework, which renders optimised bytecode at runtime for common tasks such as date handling and string manipulation in user code.

4. Project Catalyst (introduced in release 1.3)

Spark has offered SQL querying on its data since version 1.0, but the Catalyst optimizer reworks queries over a multistage process to generate an efficient physical query plan, coalesced where possible to Java bytecode.

5. DataFrames (introduced in release 1.3)

Like an RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organised into named columns, much like a table in a relational database. Because Spark can infer the schema, there is no need to use Java serialisation to encode the data. Spark can instead serialise the data into off-heap storage in a binary format, using the functionality introduced with Tungsten, and then perform transformations directly. This avoids the garbage collection costs associated with constructing individual objects for each row in the data set.

The DataFrame API sits atop the Spark SQL engine. Catalyst is therefore able to optimise queries through this API.

6. Users

From the outset, Spark 1.0 targeted a large portion of data scientists by supporting one of their favoured languages, Python. Release 1.4 added support for R, another language important to this user group. Both Python and R users are familiar with the concept of the DataFrame structure, now a key underpinning of data operations in both languages.

Using RDDs in Python in version 1.0 incurred a performance penalty. DataFrames in version 2.0 offer similarly optimised performance for all four Spark-supported languages: R, Python, Scala and Java.

The ongoing outreach to the data science community has lifted the proportion of Spark usage in R and Python significantly between versions 1.0 and 2.0.

7. Datasets (introduced in release 1.6)

The DataFrame construct can be viewed as a collection of Rows, where a Row is a generic untyped JVM object.

The Dataset construct, by contrast, is a collection of strongly-typed JVM objects, defined by a class in one of Spark’s supported compile-time, type-safe languages (Scala or Java).

This gives Spark a field-level understanding of the type of objects being processed. This extended insight allows better serialisation of strongly-typed JVM objects.

The user now has an alternative to the RDD approach. RDDs operate on unstructured data applying functions that the user is free to write. However, a non-trivial process of tuning usually ensues. The data structure and user functions are opaque to Spark, so its optimisation scope is restricted to the sequencing of operations.

DataFrames and Datasets trade off some of the user’s freedoms for performance. The overlaying of an inferable schema on the data allows Spark to apply more of its hard-won performance optimisations from Tungsten and Catalyst at a deeper level.

Datasets occupy a middle ground between RDDs and DataFrames. Applying class typing to data confers compile-time safety (detecting, for example, numerical operations being applied to text fields), and allows access to both relational DataFrame operations and higher-order functions (map, flatMap, filter, etc).

The trade-off is that this flexibility introduces JVM types and functions that cannot be optimised by Catalyst (Tungsten optimisations are still available to Datasets). Datasets are therefore not quite as performant as DataFrames. The JVM type underpinning means that Datasets are not available for R and Python users.

Overall, DataFrames should be preferred where possible, and Datasets used where their benefits make sense to do so. For example, applying Datasets at the last step of a pipeline allows collection of cast results that can be manipulated with strong typing. As mentioned earlier, compile-time type safety might be beneficial during data ingestion. Single node programmatic data manipulation solutions can be easily migrated to Spark if typed case classes are consistently used in both environments.

RDDs are now relegated to specific scenarios: where data is unstructured, or data types are complex and unable to be serialised by Tungsten encoders, or there is a need for fine-tuning computational steps.

Scala code samples follow, all running the same set of operations on a BTS flight dataset to illustrate the different approaches that Spark supports.

Setup
val spark = SparkSession.builder.master(“local”).appName(“flightDataApp”).getOrCreate()
import spark.implicits._
val df = spark.read
.option(“header”, “true”) //read headers
.option(“mode”, “DROPMALFORMED”)
.option(“inferSchema”, “true”)
.csv(“flight-data/16157900_T_ONTIME.csv”)
.na.fill(Map(“delay” -> 0.0)) // replace null values in delay column with zero

view raw1-setup.scala hosted with ❤ by GitHub

val spark = SparkSession.builder.master(“local”).appName(“flightDataApp”).getOrCreate()
import spark.implicits._
val df = spark.read
.option(“header”, “true”) //read headers
.option(“mode”, “DROPMALFORMED”)
.option(“inferSchema”, “true”)
.csv(“flight-data/16157900_T_ONTIME.csv”)
.na.fill(Map(“delay” -> 0.0)) // replace null values in delay column with zero

view raw1-setup.scala hosted with ❤ by GitHub

Using a DataFrame
val resultDF = df.filter(“delay > 0”)
.groupBy(“origin”, “dest”)
.avg(“delay”)
.sort(col(“avg(delay)”).desc)

view raw2-dataframe.scala hosted with ❤ by GitHub

val resultDF = df.filter(“delay > 0”)
.groupBy(“origin”, “dest”)
.avg(“delay”)
.sort(col(“avg(delay)”).desc)

view raw2-dataframe.scala hosted with ❤ by GitHub

Using a Dataset
case class FlightDetails(year: Int, month: Int, day_of_month: Int, airline_Id: Int, origin: String, dest: Int, delay: Double)
val ds = df.as[FlightDetails]
def averageDelay = typedAvg[FlightDetails](_.delay)
.name(“average_delay”)
val resultDS = ds.filter(_.delay > 0)
.groupByKey(x => (x.origin, x.dest) )
.agg(averageDelay)
.sort($”average_delay”.desc)

view raw3-dataset.scala hosted with ❤ by GitHub

case class FlightDetails(year: Int, month: Int, day_of_month: Int, airline_Id: Int, origin: String, dest: Int, delay: Double)
val ds = df.as[FlightDetails]
def averageDelay = typedAvg[FlightDetails](_.delay)
.name(“average_delay”)
val resultDS = ds.filter(_.delay > 0)
.groupByKey(x => (x.origin, x.dest) )
.agg(averageDelay)
.sort($”average_delay”.desc)

view raw3-dataset.scala hosted with ❤ by GitHub

Using a RDD
val rDD = df.as[FlightDetails].rdd
val resultRDD= rDD.filter(x => x.delay > 0)
.map(x => ((x.origin, x.dest), (x.delay, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.mapValues(z => z._1/z._2)
.sortBy(_._2, ascending = false)

view raw4-using-a-rdd.scala hosted with ❤ by GitHub

val rDD = df.as[FlightDetails].rdd
val resultRDD= rDD.filter(x => x.delay > 0)
.map(x => ((x.origin, x.dest), (x.delay, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.mapValues(z => z._1/z._2)
.sortBy(_._2, ascending = false)

view raw4-using-a-rdd.scala hosted with ❤ by GitHub

Using SQL
df.createOrReplaceTempView(“flights”)
val resultSQL = spark.sql(“””SELECT origin, dest, avg(delay)
FROM flights
WHERE delay > 0
GROUP BY origin, dest
ORDER BY avg(delay) desc”””)

view raw5-using-sql.scala hosted with ❤ by GitHub

df.createOrReplaceTempView(“flights”)
val resultSQL = spark.sql(“””SELECT origin, dest, avg(delay)
FROM flights
WHERE delay > 0
GROUP BY origin, dest
ORDER BY avg(delay) desc”””)

view raw5-using-sql.scala hosted with ❤ by GitHub

8. Crossing the Streams

Spark has several modules that sit atop the Core package. Reworking these to move away from RDDs in favour of DataFrames provides follow-on benefits including: conceptual familiarity, easier and more performant queries, and Tungsten and Catalyst optimisations.

As an example, Spark’s Machine Learning API has been rebased on the DataFrame API, with the RDD-based ML APIs soon to be deprecated. DataFrames and their newly facilitated ability to build pipelines of ML operations are familiar usage patterns from other languages (Python and R).

Another update has been in Spark’s graph processing toolkit. Originally called GraphX, this has been reworked to use DataFrames and renamed to GraphFrames. In GraphFrames, vertices and edges are represented as DataFrames, and arbitrary data can be stored with each vertex and edge.

Important Principles for any Open-Source Project

The Apache Spark project provides a case study in that most challenging of undertakings: building the second major release of a capable product in high demand. The foregoing observations can be summarised into principles critical for the successful stewardship of any open-source project:

  • know how real-world trends impact capabilities
  • understand the mindset and preferences of users, both current and potential
  • objectively assess the user’s pain points with product usage, no matter how revolutionary and ahead of the competition it was at inception, and
  • manage a roadmap that channels the contributions of many into parallel streams of development augmenting each other.

Want to know more about how DiUS can help you?

Offices

Melbourne

Level 3, 31 Queen St
Melbourne, Victoria, 3000
Phone: 03 9008 5400

DiUS wishes to acknowledge the Traditional Custodians of the lands on which we work and gather at both our Melbourne and Sydney offices. We pay respect to Elders past, present and emerging and celebrate the diversity of Aboriginal peoples and their ongoing cultures and connections to the lands and waters of Australia.

Subscribe to updates from DiUS

Sign up to receive the latest news, insights and event invites from DiUS straight into your inbox.

© 2024 DiUS®. All rights reserved.

Privacy  |  Terms