-
Notifications
You must be signed in to change notification settings - Fork 980
BH Scan Framework
As has been explained earlier, the goal of the batch size project is to implement controls over batch sizes. The readers turned out to be the largest source of batch size variation for a very simple reason: readers typically have no way of knowing the size of incoming data until it is read. Because of this, readers typically limited the batches based on row count: sometimes 1K, sometimes 4000, sometimes 4K, sometimes 64K. But, are row widths really well enough behaved that, say 4K rows will always fall within the desired batch size range? Or, are rows so variable that 4K rows can range in size rom 16K (integers) to 4 GB (blog posts)? Experience suggests that, in fact, row widths are highly variable, and so we need a mechanism (the result set loader) to track incoming data size and decide when to end one batch and start the next.
Our goal, then, became to rip out the code that readers now use to write to vectors (in all its wondrous variation) and replace that code with the use of the result set loader. Looking into this revealed that not only is vector writing different for each reader, so is projection. Hence the revised projection framework just discussed.
With the result set loader and projection framework in hand, we then need a structure that uses them. Rather than try to fit this structure into the existing readers and scan operators, the simpler and more robust path is to devise a new scan framework that hosts the new services, and simply preserve the source-specific bits of each reader, ported to the new framework.
The goal of this section is to describe the revised scan framework. Later sections will describe how the porting was done for the CSV and JSON readers.
After reviewing the existing set of readers and the existing scan operator, we identified the following requirements:
- Common mechanism for writing to vectors (the result set loader)
- Common mechanism for projection (the projection framework)
- Ability to run unit tests without the full Drill server (the operator framework)
- Scan is used for many kinds of readers. Factor out the common parts, and allow other parts to be build up via composition of reader-specific bits.
The result of the requirements is to refactor the scan operator into a set of modules:
- The scan operator itself which handles nothing other than managing a set of readers.
- The batch reader, which negotiates schema and reads rows.
- The task-specific reader framework which handles the tasks unique to, say, files vs. databases.
We will discuss each component. To avoid being lost in the trees, please keep the above forest in mind.
Let's look at the lifecycle of the scan operator from a number of perspectives.
The new scan operator works within the new Operator Framework by implementing the OperatorExec
interface as follow:
- Constructor: Takes a
ScanOperatorEvents
which provides task-specific callbacks for each scan event. -
buildSchema()
: Loads the first reader, obtains the schema from that reader (see below) and returns an empty batch with that schema. -
next()
: Reads a batch of data, advancing through readers as needed. -
close()
: Closes the current reader, if any and frees resources.
Under the new scan framework, the reader lifecycle is precisely defined by the RowBatchReader
interface:
- Reader is created. This can be done up-front (as is done today) or incrementally.
-
open()
: Open the reader. This can succeed (returntrue
), fail softly (returnfalse
), or fail hard (throw an exception.) A soft failure might be opening a file that no longer exists and so should be ignored. A hard failure might be a disk I/O error. -
next()
: Read a batch of data. This can return data or not. It can also indicate, if data is returned, that this is known to be the last batch. -
schemaVersion()
: Returns an increasing number indicating the schema version. A change in the number indicates that the reader changed the schema. -
close()
: Release resources.
This is the basic flow: it makes no assumptions about how a batch is created or how schema is managed. Those details are handled by an application-specific scan framework as described below.
The reader lifecycle is complex: errors can happen at any time. Once we discuss the result set loader, we'll see a very specific set of steps required. In fact, the reader lifecycle is so complex that it is factored out of the scan operator into its own class: ReaderState
.
The core scan framework classes know nothing about schemas, projection and the like. Rather, they simply provide lifecycle management. The next level of functionality resides in a scan framework. The idea is that files need different services than, say, JDBC or Kafka. The scan framework provides the required services, building on a basic platform that includes the result set loader and projection mechanisms. This also leaves open the possibility that, perhaps, there may be some reader for which these mechanisms are not needed: the core scan framework allows completely new implementations. But, for now, we'll focus on the one used for files.
A framework is an implementation of ScanOperatorEvents
that provides, essentially, a single operation:
-
nextReader()
: Return the next reader to run.
All of the complexity resides in the way that we create and manage that reader.
The bulk of the framework resides in AbstractScanFramework<T extends SchemaNegotiator>
. Notice the parameter: it is how a reader coordinates with the framework.
In this framework, readers extend ManagedReader<T extends SchemaNegotiator>
:
-
open(T negotiator)
: The reader opens its data source, whatever it is. The reader determines if it is "early" or "late" schema. Let's say it is "early" schema, such as Parquet. The reader determines the table schema and reports it to the framework using theSchemaNegotiator
, receiving a result set loader in response, which the reader uses to load data. -
next()
: Using the result set loader obtained inopen()
read a batch of rows, or report EOF.
Here is the full interface:
public interface ManagedReader<T extends SchemaNegotiator> {
boolean open(T negotiator);
boolean next();
void close();
}
Interfaces don't get much simpler! (Much effort went into making this simple so that it becomes very easy for community contributors to create new readers.)
The system provides different schema negotiators for different tasks. Here is the one for a file-based reader:
public interface SchemaNegotiator {
OperatorContext context();
void setTableSchema(TupleMetadata schema);
void setBatchSize(int maxRecordsPerBatch);
ResultSetLoader build();
boolean isProjectionEmpty();
}
The idea is that the schema negotiator provides all the information that the reader needs. This way, we pass a single value to the reader (rather than a zillon parameters) and the reader can pull out items of interest. The same mechanism allows the reader to specify a schema (if it has one), and to obtain a result set loader "primed" with the table schema and set of projected columns.
The isProjectionEmpty()
handles the special case of a SELECT COUNT(*)
query: it tells the reader it can skip reading any data if it can implement a fast path to provide just the row count.
The scan framework was specifically designed to simplify the task of developing new readers. (The ability to extend Drill with new readers is one of Drill's unique differentiators, so we should make this task as easy as possible.) Here is what a developer would need to do.
First, using the "Easy" plugin, define a method that instantiates the custom reader following a process we'll cover in the Easy plugin section. Basically, implement a class that will create each reader as it is needed, passing in information such as the Drill file system, file name, and so on.
Next, define the reader class as implementing the ManagedReader
. In that class, implement the open()
method to open the underlying data source, however that needs to be done. If the data source can provide a schema, pass it to the schema negotiator. Ask the schema negotiator for a result set loader, which is retained by the reader.
In the next()
method, read rows until either the result set loader says the batch is full, or the data source reaches EOF. If the reader must read all columns (as in a text file reader), just write all the columns to the result set loader; it will figure out which are projected.
If the reader is more complex (such as JSON or Parquet), then the reader may have a way of working with projected columns differently than non-projected columns. In this case, obtain a column writer only for the projected columns.
In the close()
method, release resources.
That's it. No projection. No mucking about with vectors. Just clean focus on the job of reading data and passing it to the result set loader.
This is best illustrated by looking at unit tests. See, for example TestFileScanFramework
which defines a variety of record batch readers in order to test the scan framework.
The scan framework works and satisfies its requirements. However, it feels just a bit clunky. There is an opportunity to streamline it as we gain experience.