Chapter 11:

Spark Structured Streaming

March 14, 2022
12 min read

Spark Structured Streaming

Apache Spark is one of the most commonly used analytics and data processing engines:it is fast, distributed, and doesn’t have I/O overhead like MapReduce. Additionally, it provides state management and offers delivery guarantees with fault tolerance. 

Spark has offered many APIs as it has evolved over the years. It started with the Resilient Distributed Dataset (RDD), which is still the core of Spark but is a low-level API that uses accumulators and broadcast variables. RDD API is excellent, but it’s low-level and can lead to performance issues due to serialization or memory challenges. 

Spark SQL is a Spark module for structured data processing with relational queries. You can interact with SparkSQL via SQL, Dataframe, or a Dataset API. In addition, Spark SQL provides more information about data and computation that lets Spark perform optimization. 

Spark streaming introduced Discretized Stream (DStream) for processing data in motion. Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Spark Streaming Versus Structured Streaming

Spark Streaming is a library extending the Spark core to process streaming data that leverages micro batching. Once it receives the input data, it divides it into batches for processing by the Spark Engine. DStream in Apache Spark is  continuous streams of data. Spark polls the data after a configurable batch interval and creates a new RDD for the execution.

Figure 1: Spark Streaming divides the input data into batches (source)

Stream processing uses timestamps to order the events and offers different time semantics for processing events: ingestion time, event time, and processing time. Ingestion time is the time when an event has entered the streaming engine; all the events are ordered accordingly, irrespective of when they occurred in real life. Event time is the time that each individual event has occurred on its producing device. Processing time refers to the time of the compute node processing the individual event. 

RDDs are low-level APIs and have only ingestion time; they cannot do much for out-of-order events. On the other hand, Spark introduced Structured Streaming starting with Spark 2.x, which lets you apply all the SQL-like operations, unlike Spark Streaming. Also, it’s closer to real-time streaming as each row is processed and the outcome is appended to an unbounded results table.

Figure 2: The output of the streaming data as an unbounded table.

Spark Structured Streaming uses the Dataframe or Dataset APIs. Dataframe is a generic row type and has a higher level of abstraction from the RDDs. Datasets are the same as the Dataframe but provide type safety. Dataset is a declarative language that operates on domain objects and offers complex operations on data with a defined schema. 


Figure 3: Dataframe vs Dataset API

The Dataframe/Dataset API is more optimized and offers better development support than RDDs such as SQL functions. Spark cannot optimize the RDD, and it’s easy to build inefficient transformations using RDDs. Spark handles all the optimizations for Dataframe/Dataset and abstracts details away from the end-user. It is easy to use, supports more operations, and offers different notions of time for processing, such as event time.

Stateful geo-replicated stream processing keeps globally distributed data consistent
One integrated platform for streams, key values, docs, graphs, and search simplifies development
Declarative configuration using JavaScript and SQL avoids the need to learn a new syntax
Free Dev Account

Spark Structured Streaming

Spark Structured Streaming is a stream processing engine built on Spark SQL that processes data incrementally and updates the final results as more streaming data arrives. It brought a lot of ideas from other structured APIs in Spark (Dataframe and Dataset) and offered query optimizations similar to SparkSQL.

It is consistent because it provides a guarantee that application output will be equivalent to the execution of a batch job running on the input data processes at any time. Additionally, Spark Structured Streaming is fault-tolerant and manages interactions with the data source and sink (output), with semantics to handle out-of-order data events as well. For example, Spark will update results based on the received data if a data point is received late, you can filter and discard delayed data. The API is straightforward to use and has many similarities to SQL. 

The Spark Streaming application has three major components: source (input), processing engine (business logic), and sink (output). Input sources are where the application receives the data, and these can be Kafka, Kinesis, HDFS, etc. The processing or streaming engine runs the actual business logic on the data coming from various sources. Finally, the sink stores the outcome of the processed data, which can be an HDFS, a relational database, etc.

Case Study

To show Spark Streaming in action, we will follow an example of patient data at a hospital. The hospital has a requirement to aggregate the data to identify the busy days at the hospital’s different departments in order to predict busy days to plan staff accordingly. The patient’s visit history information sample is as follows (to keep the example simple, it isn't normalized).

PID Name DID DName VisitDate
2984641 Emily 35 cardio 01.09.2021
9454384 Riikka 86 ortho 21.07.2021
9266396 Fanny 86 ortho 03.06.2021
5247541 Urooj 35 cardio 21.08.2021

Table 1. Sample data of patients’ visit history at a hospital

The following is the code that reads the file as a stream.


customSchema = StructType() \
     .add("PID", IntegerType(), True) \
     .add("Name", StringType(), True) \
     .add("DID", IntegerType(), True) \
     .add("DName", StringType(), True) \
     .add("VisitDate", DateType(), True)

dfPatients = spark \
     .readStream \
     .format("csv") \
     .option("header",True) \
     .option("path","patients_visit_data.csv") \
     .schema(customSchema) \
     .load()
 

Spark can infer the scheme when reading from different sources, but you must specify the schema for structured streaming to ensure consistency. However, for ad-hoc use cases, users can rely on the schema inference of Spark.

Input Sources

Spark supports various input sources for data ingestion

  • File Source: File source is used for streaming data from a directory via DataStreamReader. Supported formats are text, CSV, JSON, and Parquet. The processing order of the files can be defined as well, such as latestFirst or lastModified. You can find a list of supported formats here.
  • Kafka Source: Data is read from Apache Kafka. This only supports Kafka version 0.10.0 or higher. 
  • Socket Source: It reads the data from a socket connection. This is mainly used for testing since it doesn’t support fault-tolerance guarantees.
  • Rate Source: This option generates random data with two columns: “timestamp” and “value.” Similar to the socket input source, this is recommended to be used for testing.

Sinks

Once all the computations are performed on the data, sinks store the output to external systems. They use DataStreamWriter via writeStream. Again, Spark offers quite a few sinks out of the box:

  • Console: Displays the results onto the console. This option is useful for application development and debugging but is not recommended in the production environment.
  • Memory: Stores the output to an in-memory table. This Sink is recommended to use only for development as it doesn’t offer fault tolerance.
  • File: Stores the resultant Dataframe into a file in a directory. The supported formats are JSON, orc, CSV, and Parquet.
  • Kafka: Publishes the results to a Kafka topic.
  • Foreach: Similar to the “foreach” loop in programming, this option helps you apply specific logic on each row while storing the data. 
  • ForeachBatch: Creates the output’s micro-batches and lets you apply custom logic on each batch for data storage.  

Output Modes

There is no finite output when dealing with streaming data; instead, we have a stream of output, requiring different semantics to store the data. Therefore, Spark uses different modes to display the results.

  • Append: You will see only newly processed rows in the output.
  • Update: Spark will output only updated rows. This is valid only if there are aggregation results; otherwise, this would be similar to Append mode. 
  • Complete: All the processed rows will be displayed. 

The following is a complete program that streams patients’ data, filters it for the ortho department, and sinks (output) the results onto the console.


	from pyspark.sql.types import IntegerType, DateType, StringType, StructType
	
	customSchema = StructType() \
		 .add("PID", IntegerType(), True) \
		 .add("Name", StringType(), True) \
		 .add("DID", IntegerType(), True) \
		 .add("DName", StringType(), True) \
		 .add("VisitDate", DateType(), True)
	
	#read the CSV file with headers and apply the schema
	dfPatients =  spark \
		 .readStream \
		 .format("csv") \
		 .option("header",True) \
		 .option("path","dept_data.csv") \
		 .schema(customSchema) \
		 .load()
	
	
	#Apply filters to get only patients from the ortho department
	orthoPatients = dfPatients.select("PID","Name").where("DID =86")
	
	
	#start the streaming of output data
	orthoPatients \
		 .writeStream \
		 .format("console") \
		 .start()
	 

Structured Streaming Operations

Structured Streaming APIs have similarities with the batch (Dataframe and Dataset) APIs, and most of the operations can be used as-is. In addition, all operations can be applied to the streaming Dataframes since Structured Streaming leverages the existing APIs. It supports untyped, typed (map, filter, and flat map) and SQL-like operations (select, where, groupBy, etc.). These transformations can be adding or removing rows or columns from the data or changing the order of rows based on the values of the columns. 

One of the most common operations is select. Select and selectExpr enable operations equivalent to SQL queries such as this one:


	dfPatients.select("name", col("name"), column("name"), expr("PID +3"))
	 

A column can be referred to in multiple ways while querying. For example, it can be referred to via the col or columns function or directly with the column name. Furthermore, we can create an expression based on the column as well. 

Rows can be filtered using the filter or where functions. Both of the following queries will return the same results:

	
	dfPatients.filter("DID = 86")
	dfPatients.where("DID = 86")
	 

Similarly, distinct can be used to extract the unique rows from the data. The following is a query to see all the distinct patients in the cardio department:

	
dfPatients.where("DID = 86").select("name").distinct()
	 

Some transformations help you modify columns. For example, WithColumn adds a new column to a Dataframe, withColumnRenamed renames a column, and drop will remove a column from the Dataframe. The following snippet uses an expression to calculate a new column that shows whether a patient recently visited the hospital:

	
dfPatients.withColumn("recent",year("VisitDate")>= 2021)
	 

If we want to see all the employees (ID, Name) who have visited the cardio department, we use a select with a where clause, as follows:


f.select("PID","Name").where("DID = 86")
 

Finally, aggregation operations are also supported. These can be used to summarize a complete Dataframe or apply a group-by or windowing operation. Some of the supported actions are listed below:

Collect Returns an array of all the rows in the Dataframe
count Counts the number of rows in the Dataframe
first Returns the first row of the Dataframe
head Returns the first row of the Dataframe
show Displays the top 20 rows of the Dataframe
take Returns n rows from the Dataframe
min/max Finds the min or max value from a Dataframe
sum Adds all the values in a column of a Dataframe
avg Calculates the average for a specific column in a Dataframe

Table 2. List of actions for Dataframes

Some more complex aggregation is possible using groupBy, where we group all the items based on a specific key (column) and apply the aggregation function. For example, in our dataset, we want to see how many times a patient has visited each hospital department, so we group on the employee and department and apply the count aggregation as follows:

	
dfPatients.groupBy("PID","Name","DID").agg(count("PID"))
 

Windowing on the Streams

Stream processing is not complete without windowing. Since data is infinite, we need to perform data aggregation periodically or over a specific interval. The group-by operation needs to see all the data before executing the aggregation function, so windows create an illusion of finite data with the notion of event time. Since windowing is similar to group-by, you use groupBy() and window() to express windowed aggregations. Spark supports three types of window aggregations: Tumbling, Sliding, and Session windows. 

Tumbling Window

A tumbling window is a fixed-sized and non-overlapping window where each element is bound to a single window. For example, if we have a window size of 10 seconds, as in Figure 4, all the incoming events during the 10-second interval will fall into that window. We can define a tumbling window with a date-time field and a specific gap. Spark will divide the input stream into fixed-size segments after each interval.

Figure 4. Tumbling windows with an interval of 10 seconds 

The following snippet calculates the visitors’ count monthly. It is a windowed aggregation with a tumbling window of 30 days and counts all the records.


dfPatients \
 .groupBy(window(col("VisitDate"), "30 days")) \
 .agg(count("PID").alias("aggregate_count")) \
 .select("window.start", "window.end", "aggregate_count")
 	 

The same query can also be written as follows:


 dfPatients \
  .groupBy( window(col("VisitDate"), "30 days"), col("PID")) \
  .count()
   

Sliding Window

A sliding window is similar to a tumbling window, but it is overlapping. Therefore, we need to provide a sliding offset and the interval for the definition of a sliding window. For example, if we have a window size of 10 seconds with a sliding offset of 5 seconds, windows will slide every 5 seconds to create a new window of 10 seconds, as shown in the figure.

Figure 5. Sliding window with an interval of 10s and sliding offset of 5s. (Source)

The following snippet calculates the monthly visitor count and emits the results every week. We have created a window on the visitDate for every 30 days and a sliding interval of a week. 


  dfPatients \
   .groupBy(F.window(F.col("VisitDate"), "30 days", "1 week")) \
   .agg(F.count("PID").alias("aggregate_count")) \
   .select("window.start", "window.end", "aggregate_count")
    

Session Window

Session windows have different semantics than tumbling and sliding windows because they look for elements that have occurred continuously. They can be of a dynamic size and depend on the incoming data. 

Session window boundaries are inactivity periods when there is no incoming data. They start when an input has been received and continue as long as we keep receiving data within the time interval equivalent to the window size. For example, if we have a session window of 5 seconds, the window is started as soon as we receive an input element. Afterward, all the events received within 5 seconds will belong to the window, which closes if we don’t receive any data for 5 seconds. As shown in Figure 6, we have our first window from 0-19 s. The second window starts only on the 26th second when we receive a new element. 

Figure 6. Session window with an interval of 5 seconds

Session windows are essential if we want to know who has been visiting the hospital at a certain point or what part of the year most of the patients were frequently visiting (within 30 days). 


   dfPatients \
	.groupBy( session_window(col("VisitDate"), "30 days"), col("PID")) \
	.count() \
	 
Platform
Spark
Flink
Macrometa
Batch
✔️
✔️
✔️
Complex Events
✔️
✔️
✔️
Streams
✔️
✔️
✔️
Geo-Replication
✔️
✔️
Database
✔️
Graphs
✔️
Free Dev Account
Platform
Batch
Complex Events
Streams
Geo-Replication
Database
Graphs
Spark
✔️
✔️
✔️
Flink
✔️
✔️
✔️
✔️
Macrometa
✔️
✔️
✔️
✔️
✔️
✔️

Spark Structured Streaming Limitations

Spark Structured Streaming solves many challenges, but it comes with some of its own. Although it offers streaming semantics, Spark is still not a native stream processing engine at its core. Instead, it’s a quicker batch operation that operates on smaller batches of the incoming data. As a result, it limits the performance of Structured Streaming compared to native stream-processing engines such as Flink and Macrometa.  

Furthermore, Spark overcame the performance issues of Apache Hadoop by processing data in memory. However, It stores the intermediate results between different stages on disk, reducing its performance compared to other processing engines that send the data directly to another node for the next stage of processing. Additionally, Spark offers limited options for state storage as there is only a single state store supported even though it can provide integration with other systems. 

Finally, Spark operators are stateless by default which is helpful for recovery and load balancing but presents a problem for supporting machine learning applications since most algorithms require state operators. To alleviate this shortcoming, Spark implemented some stateful operators with the mapWithState API in Spark 1.6 which automated some of the steps developers previously hand-coded to maintain state in custom applications.

Conclusion

Spark Structured Streaming introduced the notion of incremental output and expanded it into all libraries. It offers straightforward APIs and is easy to start with for someone who has worked with SQL in the past. It offers consistency guarantees and transactional integration with storage systems with its source and sinks. It also has an integration with the rest of the Spark ecosystem, such as integrating machine learning libraries or running interactive queries on the streaming state. However, it’s still not a native streaming processing platform and comes with challenges in the areas of performance and state management compared to modern streaming platforms conceived after Spark was introduced to the market in 2010.

Subscribe to our Linkedin Newsletter to recieve more educational content
Subscribe now