Chapter 8:

Apache Beam Tutorial

October 23, 2021
18 min read

Apache Beam Tutorial

Various technologies are available to solve big data processing challenges, such as Hadoop, Apache Spark, Apache Samza, and Apache Flink. However, each has pros and cons, making it a challenge to choose the right one for a particular use case. For example, some solutions are suitable for real-time processing, while others have better integration with various data sources.

Figure 1 shows the evolution of distributed data processing over the years. It all started with MapReduce and two ecosystems: open source and Google. These ecosystems evolved independently but, unfortunately, were isolated and not interoperable until Apache Beam came out. Beam combines the best of both worlds: It lets developers build language-agnostic data pipelines that can run on any processing engine, either open source or Google.

Figure 1. Evolution of Apache Beam. [Source]

Code portability is the key feature of Apache Beam—it is purpose-built to process the pipelines from any number of beam processing engines. It is a unified programming model that supports the development of portable big data pipelines. The unified model lets developers write both batch and streaming code in one go. In contrast, other platforms provide different APIs, such as Resilient Distributed Datasets (RDD) for batch processing, datasets for streaming in Apache Spark - and data sets for batch processing, and DataStream for streaming in Apache Flink. (The name “Beam” actually comes from combining “Batch” and “Stream.”) 

Apache Beam pipelines are executed using runners. Before execution, a runner translates the Apache Beam code into the API compatible with the execution engine of your choice, making it easy to switch to different execution engines for each specific use case. For example, if the team decides to use another processing engine, they can reuse the existing code instead of starting a new development project from scratch; with Apache Beam, the change can be accomplished with just config changes. 


Apache Spark, Apache Flink, Apex, Google Dataflow, and Apache Samza are some of the well-known frameworks supported by Beam at the moment. In addition, Java, Python, and Go are supported programming languages. Apache Beam's capability matrix shows the capabilities of the individual runners.

Apache Beam Concepts

Some of the common Apache Beam concepts referred to frequently in this article are the following:

  • Pipeline: A definition of an end-to-end data processing job. All the operations, inputs, and outputs are defined in the scope of a pipeline. It is also possible to configure where and how to run a pipeline. 
  • PCollection: An immutable collection of elements of a specific type T (similar to Spark’s RDD<T>), identical to RDD in Apache Spark or DataSet/DataStream in Apache Flink. It can contain either a bounded or an unbounded number of elements. A PCollection is the input and output for each PTransform.
  • PTransform: A PTransform is an operation that needs to be performed on a single data element. It takes an input PCollection and transforms it into zero or more output PCollections.
  • Runner: A runner translates the beam pipeline into the compatible API of the chosen distributed processing backend, such as Direct Runner, Apache Flink, or Apache Spark.

Apache Beam Programming Model

An Apache Beam pipeline is an ordered graph of different operations (transformations) for a data processing workflow. It is a combination of a PCollection and a PTransform. 

A PCollection needs to have a watermark, and each element has a timestamp associated with it. This information is used to process an infinite stream and differentiates between timestamps that have been processed already. 

Finally, a runner refers to the data processing engine (e.g., Spark or Flink) that will execute the Apache Beam code. Apache Beam code is translated into the runner-specific code with the operators supported by the processing engines. 

In a nutshell, the Apache Beam pipeline is a graph of PTransforms operating on the PCollection 

There are three considerations when developing an Apache Beam pipeline, as shown in the table below. 

Name Description Example Transforms
Input How or where is your input data stored, and how are you going to read it? ReadFromText
Transformations What transformations are required? For example, do general beam operators meet the data transformation needs, or is it necessary to write custom transformers using ParDo? Map
Filter
ParDo
Output What will the output format be, and where will it be stored so that you can decide what transforms need to be applied? WriteToText

The following diagram shows a simple pipeline with all the components we have discussed so far. First, it reads the data from a table using a read transformation into a PCollection. Then a transform or operation is applied. Finally, it uses the operation on each element of the PCollection, which lets you apply different functions depending on the data element’s nature. Also, multiple transforms can be applied to the same PCollection, generating multiple PCollections, which can be merged. Finally, an output collection is returned to the database using a write transform. 

Figure 2. Apache Beam pipeline components [Source]

For example, an employee count pipeline will read the data in a PCollection. Then a count transform will be applied, and a PCollection containing the employee count will be generated. Finally, the output collection will be written to the database using write transform. 

Transforms

Apache Beam can be installed on the local machine, but it requires a Python installation, multiple dependencies, and more. A more straightforward solution is to use the Google Colab—an interactive environment that allows anybody to develop using Python in a browser. Google Colab doesn't require any setup and runs virtually in the cloud. It provides Jupyter notebooks for writing code and is free to use. Here is an example of setting up Apache Beam in the Colab and running a word count program. 

After creating a new notebook in Google Colab, it will have Python already set up, so only Apache Beam will need to be installed. Any instructions surrounded by "!{}" will be executed on the shell, so the following command will install the latest version of Apache Beam, making the environment ready for development. 

!{pip install --quiet apache-beam}

For the rest of the article, we will use an example involving patient data at a major hospital. In this example, the hospital has a requirement to aggregate the data to identify the busy days for each department. In addition, the hospital’s management wants to predict these busy days to plan staffing requirements accordingly. The following table presents a sample from the patient’s visit history information (for the sake of simplicity, it isn't normalized).

patient_id patient_name dept_id dept_name visit_date
2984641 Emily 35 cardio 1/9/21
9454384 Riikka 86 ortho 21-07-2021
9266396 Fanny 86 ortho 3/6/21
5247541 Urooj 35 cardio 21-08-2021

We assume that the data is stored in a text file, which we will use as the input. Since we are going to process a static file, it will be an example of batch processing. Still, the same rules apply for the stream processing as well, where instead of reading from the static file, we stream the data from a stream source such as Apache Kafka, Google Pub/Sub, or some database.

The first question the hospital needs to answer is how many times a patient visits the hospital, so we need to aggregate the data. The following code shows a complete example of counting visits for the cardio department.

import apache_beam as beam

p1 = beam.Pipeline()
visit_count = (
 
  p1
  |beam.io.ReadFromText('data.txt')

  |beam.Map(lambda record: record.split(','))
  |beam.Filter(lambda record: record[3] == 'cardio')
  |beam.Map(lambda record: (record[1], 1))
  |beam.CombinePerKey(sum)
 
  |beam.io.WriteToText('out_data.txt')
)

p1.run()

Listing 1. Example: Patient visit count for the cardio

department with core transforms

Initially, we import the beam namespace and create an alias to it. Then we create the pipeline 'p1' and a PCollection visit_count, which will store the results of all the transformations.

Pipeline p1 applies all the transforms in sequential order. It starts with reading the data using read transform into a collection and then splitting the row into columns, applying the filter for the cardio department, and creating a collection of records for that department only. Next, we make a pair of each patient, which then can be combined and summed up. Finally, a group by operation on patient_id with a sum operation will give us the required results.

Transforms

Transforms are operations that are applied to individual elements of the pipeline and provided as a function. There are quite a few transforms provided by the Apache Beam. Some predefined transforms are Map, FlatMap,  and Filter, but one can create custom transforms from the “PTransform” class.

I/O Transform

Apache Beam provides built-in support for various I/O formats like file formats, messaging systems, databases, and file systems. The supported file formats are text, Avro, Parquet, S3, and Google cloud storage. All read/write transforms are under the IO namespace.

It is possible to create a PCollection using “beam.Create()”, which lets you create various types, e.g., lists, sets, or dictionaries. The following example creates a list of integers and then writes them into a file.

ilines = (
  p1
  |beam.Create([1,2,3])
  |beam.io.WriteToText('outfile.txt')
)

Listing 2. Creating a PCollection using the beam.Create()

Finally, similar to a read transform, we have a corresponding write transform, which writes the output of the Apache Beam pipeline.

Filter Transform

This is self-explanatory: Given a condition, the transform filters out all elements that don't satisfy it. A Filter transform can also be used to filter based on inequality with a given value based on the comparison ordering of the element. Here is an excellent Google Colab showing different variations of the Filter transform. 

Map/FlatMap Transform

A Map transform converts a PCollection of N elements into another PCollection of N elements. A FlatMap can have zero or more outputs—it takes a PCollection of N elements and returns N collections of zero or more elements that are flattened into a final PCollection. 

For example, if we have a list of three elements, applying the same lambda operation using Map will produce a collection of three lists, while FlatMap will produce a collection of six individual elements, as follows:

beam.Create([1, 2, 3]) | beam.Map(lambda x: [x, 'macroMeta']) 

# Three lists output: [[1, 'macroMeta'], [2, 'macroMeta'], [3, 'macroMeta']]

beam.Create([1, 2, 3]) | beam.FlatMap(lambda x: [x, 'macroMeta']) 

#Six single elements output: [1, 'macroMeta', 2, 'macroMeta', 3, 'macroMeta']

Listing 3. Differences in output from equivalent Map and FlatMap transforms.

 

ParDo Transform

ParDo is a generic parallel processing transform. It is similar to Map because it takes a collection as input, does some processing, and returns the output. However, It can return zero, one, or multiple elements of different types, which is different than Map/FlatMap, where one cannot change the element type. It is also used for slicing and dicing collections. 

As ParDo is a generic transform, it is used to write more complex and customized transforms. So, we can achieve Map, FlatMap, and filtering functionality using ParDo as well. 

The following code listing shows how to calculate the number of visitors to the cardio department, like in our previous example, but using the ParDo transform this time.

import apache_beam as beam

class SplitRow(beam.DoFn):
def process(self, element):
  return  [element.split(',')]

class FilterCardioPatients(beam.DoFn):
def process(self, element):
  if element[3] == 'cardio':
    return [element]
 
class PairPatients(beam.DoFn):
def process(self, element):
  return [(element[1], 1)]
class Counting(beam.DoFn):
def process(self, element):
  (key, values) = element
  return [(key, sum(values))]
 

p1 = beam.Pipeline()

visit_count = (
 
  p1
  |beam.io.ReadFromText('dept_data.txt')
 
  |beam.ParDo(SplitRow())
  |beam.ParDo(FilterCardioPatients())
  |beam.ParDo(PairPatients())
  |beam.GroupByKey()
  | beam.ParDo(Counting())
 
  |beam.io.WriteToText('parddo_output.txt')
)

p1.run()

Listing 4. Patient visit count for the cardio department using the ParDo transform

ParDo accepts a DoFn object, which contains the processing logic for each element of the input collection. It executes the user function on the distributed systems in parallel. We can define custom logic by overriding the process method of the DoFn class. Map and FlatMap also inherit from the DoFn, so we can use ParDo for any use cases to build more complex transforms. 

In the above code listing, Map and Filter are replaced with ParDo, which uses the same code logic. You will notice that we are returning a list from the process method because it can produce more than one element. If we return a list of elements, it will act like Map; otherwise, it will behave like FlatMap and create multiple elements.

Pipeline Branching

So far, we have seen the primary example of a pipeline where everything is flowing smoothly. However, most pipelines will be more complex in real life, where a transform can simultaneously produce multiple collections. For example, let’s adjust our use case, supposing we now need to count the visitors for two different departments. In this case, the transform will produce two outputs through branching in the beam pipeline, as shown in Figure 3.

Figure 3. Pipeline branching [Generated with draw.io]

The Beam runner will take the pipeline and create the DAG (Directed acyclic graph) out of it before execution. PCollection will be the nodes, and PTransforms will be the edges of the DAG. Once the DAG is generated, the runner decides the sequence of parallel execution of the functions. In our example, Apache Beam will execute both pipelines in parallel because they are independent. 

Each operation can be labeled in the pipeline code, which helps for debugging purposes—especially when you have complex pipelines—to figure out which transform is causing the issue. Each label needs to be unique; otherwise, Apache Beam will throw an error. 

The following is the code listing of the pipeline shown in figure 3, where we have two separate output collections and operations are labeled as well.


import apache_beam as beam


p = beam.Pipeline()

input_collection = (

    p
    | "Read input data" >> beam.io.ReadFromText('dept_data.txt')
    | "Split rows into columns" >> beam.Map(lambda record: record.split(','))
                  )

cardio_count = (
    input_collection
    | 'Filter cardio patients' >> beam.Filter(lambda record: record[3] == 'cardio')
    | 'Pair each patient with 1' >> beam.Map(lambda record: ("cardio, " +record[1], 1))
    | 'Aggegate cardio' >> beam.CombinePerKey(sum)
    )

ortho_count = (
              input_collection
              | 'Get all ortho patients' >> beam.Filter(lambda record: record[3] == 'ortho')
              | 'Pair each ortho patient with 1' >> beam.Map(lambda record: ("ortho, " +record[1], 1))
              | 'Aggregate Ortho' >> beam.CombinePerKey(sum)
          )

output =(
        (cardio_count,ortho_count)
  | beam.Flatten()
  | beam.io.WriteToText('data/both')
)

 Listing 5. Patient visit count for multiple departments with pipeline

branching

We can store the output of both transforms in separate files, but Apache Beam provides a transform called flatten that can combine multiple collections of the same data type. It merges various PCollectons into a single logical PCollection, similar to a union operation. In our case, both collections have the same data types and number of columns, allowing them to be flattened together. Flatten works on a tuple, so we create a tuple of collections and then call the transform and write the results to the file.

Composite Transforms

Apache Beam allows nested transformations, where a transform can perform multiple other transforms, such as ParDo, Combine, etc. These are called composite transforms. Hence, a composite transforms is a transform that has a series of transforms combined in a helpful processing pattern.

In our example, we calculated the number of visits for two different departments. If we want to add additional criteria, such as filtering the patients who have visited more than once, we will have to add this extra code in both places. Such changes are error-prone, and the resulting code is hard to maintain due to code duplication, leading to bugs. Instead, we will create a composite transform, like a reusable method for both use cases.  

Our code after using composite transform will be as follows:


import apache_beam as beam

class CustomTransform(beam.PTransform):
  def expand(self, input_coll):
 
  a = (
      input_coll
                      | 'Group and sum' >> beam.CombinePerKey(sum)
                      | 'filter regular patients' >> beam.Filter(filter_on_count)
                      | 'Regular patient' >> beam.Map(format_output)
           
  )
  return a

def SplitRow(element):
  return element.split(',')

def filter_on_count(element):
 name, count = element
 if count > 1:
  return element
 def format_output(element):
 name, count = element
 #return ', '.join((name.encode('ascii'),str(count),'Regular employee'))
 return ', '.join((name,str(count),'Regular Patient'))

p = beam.Pipeline()

input_collection = (

              p
                    | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')
                    | "Split rows" >> beam.Map(SplitRow)
                  )

cardio_count = (
                    input_collection
                    | 'Get all cardio patients' >> beam.Filter(lambda record: record[3] == 'cardio')
                    | 'Pair each patient with 1' >> beam.Map(lambda record: ("cardio, " +record[1], 1))
                    | 'composite cardio Patients' >> CustomTransform()
                    | 'Write results for cardio' >> beam.io.WriteToText('cardio_output.txt')
                )

ortho_count = (
              input_collection
              | 'Get all HR dept persons' >> beam.Filter(lambda record: record[3] == 'ortho')
              | 'Pair each hr employee with 1' >> beam.Map(lambda record: ("ortho, " +record[1], 1))
              | 'composite Ortho Patients' >> CustomTransform()
              | 'Write results for Ortho' >> beam.io.WriteToText('ortho_output.txt')
          )
p.run()

Listing 6. Patient visit count for multiple departments using a composite transform.

We have inherited the “CustomTransform” from the PTransform base class. All the core transforms, like Map and FlatMap, inherit from the PTransform class as well. “CustomTransform” overrides the expand method of the base class and defines the processing logic, which applies a sequence of core transforms.

Recommendations

We recommend reading Streaming 101 and 102 by Tyler Akidau, the brain behind the Apache Beam project and highly respected in the data processing community for his contributions. Our own content on the topic of streaming processing is another great source of information before you get started. 

We also recommend learning by practicing with the examples that we provided in this article. You can find our code examples in a Google Colab notebook

Apache Beam Katas is an interactive way to learn application development using Apache Beam. It is a collection of interactive coding exercises to develop the Apache Beam concepts and programming model. It is purpose-built to solve problems using beams with increasing difficulty. It was created with JetBrains’ Educational products and is available in Java, Python, Go, and Kotlin SDKs. One can install the Edu version of IntelliJ or PyCharm and search for the “Beam Katas" course. 

Macrometa’s integrated edge computing platform  complement’s Beam to accelerate successful project delivery. The Beam pipeline operates as a workflow manager that can use Macrometa’s noSQL database, pub/sub, and event processing services. Macrometa’s Global Data Network also offers a 50ms P90 round trip response time anywhere in the world. 

Learn more about Macrometa’s technology