Vincent Stollenwerk

Making sense of Apache Spark Structured Streaming Sources

I'm currently looking into how custom data sources work in Apache Spark. However, I found the resources on custom data sources quite confusing. Here are some notes I took along the way.

Old data source APIs

There were three different versions of streaming sources over time. The original source API, the SourceV2 API, and the current API which was introduced with Apache Spark 3.0.

Original source API

Originally, data sources were implemented by implementing the Source trait of the org.​apache.​spark.​sql.​execution.​streaming package. Interestingly, the Source trait is still present in the Apache Spark 3.0 source code, but not in the API documentation.

SourceV2 API

Later, the SourceV2 API was introduced. As a result, data sources now had to implement the empty DataSourceV2 interface of the org.​apache.​spark.​sql.​sources.v2 package. Additionally, data sources had to implement a ReadSupport interface to realize data reading. For streaming use cases this usually meant implementing the MicroBatchReadSupport interface.

The SourceV2 interface is no longer present in the Apache Spark 3.0 source code.

Current data source API (Spark 3.0)

With Apache Spark 3.0, which is the current version at the time of writing, the source API was changed again. Before we get into the implementation details, let's discuss some background on Spark data sources.

Background

The Spark data source API tries to be a unified API for both batch and streaming data sources. Nevertheless, there are some differences between implementing data sources for batch and streaming. As a result, data sources can have different capabilities. The set of possible capabilities is defined in the TableCapability enum. For streaming data sources, the following two capabilities are relevant:

  • MICRO_BATCH_READ: Signals that the data source can be used for micro-batch reads in streaming mode.
  • CONTINUOUS_READ: Signals that the data source can be used for continuous reads in streaming mode. Here, the data is read record by record.

Implementation

It looks like the data source API is used to implement both data sources and sinks. I find that naming very confusing. Not sure if I'm missing something.

Data Sources

Now data sources have to implement the TableProvider interface. Usually in a class named DefaultSource.

This DefaultSource class is mainly concerned with creating a object implementing that interface. Data sources also have to implement the SupportsRead interface, which itself extends the Table interface. Implementing the Table interface involves specifying the capabilities of the data source, e.g., MICRO_BATCH_READ or CONTINUOUS_READ for streaming data sources.

Additionally, the SupportsRead interface is concerned with creating a ScanBuilder object which in turn is responsible for creating a Scan object.

Finally, this Scan object is responsible for creating either a MicroBatchStream or a ContinuousStream object which are used by the Spark engine to read data from the data source.

Data Sinks

Data sinks actually use the same TableProvider API and are implemented in a similar way. Instead of implementing the SupportsRead interface, data sinks implement the SupportsWrite interface. This also means specifying the STREAMING_WRITE capability for streaming data sinks.

Analogous to the ScanBuilder and Scan objects for data sources, data sinks have to create a WriteBuilder which in turn has to return an object implementing the StreamingWrite interface.

Finally, the StreamingWrite object has to create a StreamingDataWriterFactory which in turn creates a DataWriter which is finally responsible for actually writing the data.

References

I've found much of the information on Spark 3.0 data sources by following the following Stack Overflow answer:

Additionally, there is a blog post series by Madhukara Phatak on the Data Source V2 API in Spark 3.0. I've only read its part on the write API to understand the data sinks: