Making sense of Apache Spark Structured Streaming Sources
I'm currently looking into how custom data sources work in Apache Spark. However, I found the resources on custom data sources quite confusing. Here are some notes I took along the way.
Old data source APIs
There were three different versions of streaming sources over time.
The original source API, the SourceV2
API, and the current API
which was introduced with Apache Spark 3.0.
Original source API
Originally, data sources were implemented by implementing the
Source
trait of the
org.apache.spark.sql.execution.streaming
package. Interestingly, the Source
trait is still present in the
Apache Spark 3.0 source code,
but not in the API documentation.
SourceV2 API
Later, the SourceV2 API was introduced. As a result, data sources now had
to implement the empty
DataSourceV2
interface of the org.apache.spark.sql.sources.v2
package.
Additionally, data sources had to implement a
ReadSupport
interface to realize data reading. For streaming use cases this usually meant implementing the
MicroBatchReadSupport
interface.
The SourceV2
interface is no longer present in the Apache Spark 3.0 source code.
Current data source API (Spark 3.0)
With Apache Spark 3.0, which is the current version at the time of writing, the source API was changed again. Before we get into the implementation details, let's discuss some background on Spark data sources.
Background
The Spark data source API tries to be a unified API for both batch and streaming data sources.
Nevertheless, there are some differences between implementing data sources for batch and streaming.
As a result, data sources can have different capabilities. The set of possible capabilities
is defined in the TableCapability
enum. For streaming data sources, the following two capabilities are relevant:
MICRO_BATCH_READ
: Signals that the data source can be used for micro-batch reads in streaming mode.CONTINUOUS_READ
: Signals that the data source can be used for continuous reads in streaming mode. Here, the data is read record by record.
Implementation
It looks like the data source API is used to implement both data sources and sinks. I find that naming very confusing. Not sure if I'm missing something.
Data Sources
Now data sources have to implement the
TableProvider
interface. Usually in a class named DefaultSource
.
This DefaultSource
class is mainly concerned with creating a object implementing
that interface. Data sources also have to implement the
SupportsRead
interface, which itself extends the
Table
interface.
Implementing the Table
interface involves specifying the capabilities of the data source, e.g., MICRO_BATCH_READ
or CONTINUOUS_READ
for
streaming data sources.
Additionally, the SupportsRead
interface is concerned with creating a
ScanBuilder
object which in turn is responsible for creating a
Scan
object.
Finally, this Scan
object is responsible for creating either a
MicroBatchStream
or a ContinuousStream
object which are used by the Spark engine to read data from the data source.
Data Sinks
Data sinks actually use the same TableProvider
API and are implemented in a similar way.
Instead of implementing the SupportsRead
interface, data sinks implement the
SupportsWrite
interface. This also means specifying the STREAMING_WRITE
capability for streaming data sinks.
Analogous to the ScanBuilder
and Scan
objects for data sources, data sinks have to create a
WriteBuilder
which in turn has to return an object implementing the
StreamingWrite
interface.
Finally, the StreamingWrite
object has to create a
StreamingDataWriterFactory
which in turn creates a
DataWriter
which is finally responsible for actually writing the data.
References
I've found much of the information on Spark 3.0 data sources by following the following Stack Overflow answer:
Additionally, there is a blog post series by Madhukara Phatak on the Data Source V2 API in Spark 3.0. I've only read its part on the write API to understand the data sinks: