Skip to content
Paul Rogers edited this page Dec 4, 2019 · 10 revisions

Create the Group and Sub Scans

We have now helped Calcite resolve a table name. Next we begin to define what we will do with the table, which is to scan (read) it. Some systems are fancier than others: some read a single, contiguous data source, some read "splits" of a data source (such as file blocks in HDFS), some arbitrarily break up a scan into chunks, others "push" the scan to an external system, and so forth. Calcite provides a great deal of flexibility to define these cases. For our needs, we assume we scan a single data source from start to end.

Drill uses a three-stage process to convert a table into a scan operator definition:

  • Resolve the table to create a scan specification.
  • Build up the scan with as a group scan. The group scan handles project push-down (columns), optional filter push-down, parallelization and more.
  • Determine the information needed to perform the physical scan and create a sub (specific) scan specification.

Scan Specification

We start by defining a "scan specification": a description of the overall logical scan, before we've started worrying about the details. Our class just needs the table name:

public class ExampleScanSpec {

  private final String tableName;

  @JsonCreator
  public ExampleScanSpec(String tableName) {
    this.tableName = tableName;
  }

  public String getTableName() { return tableName; }
}

You can add any additional information needed by your plugin (schema path, table metadata, etc.) For this reason, the scan spec is unique to each plugin; there is no common base class.

Create the Schema Spec

Tell Drill about the scan spec by attaching it to the Drill table created in the schema:

  class DefaultSchema extends AbstractSchema {
    ...
    @Override
    public Table getTable(String tableName) {
      ...
        return registerTable(name,
            new DynamicDrillTable(plugin, plugin.getName(),
            new ExampleScanSpec(name)));
      ...

Test the Scan Spec

We can now repeat our test, stepping through the code to verify that the scan spec and dynamic table are created correctly.

Group Scan Definition

Drill is a member of the Hadoop ecosystem, and so supports the idea that a table may be represented by a directory of files, where each file consists of blocks distributed across storage nodes. A "group scan" is the term for the logical scan of the pieces that make up a table, and represents one additional layer of refinement from the scan specification.

As we have noted, Drill uses Calcite for query planning, and so the scan objects fit into the Calcite structure. As a result, the group (and later, "sub") scans are a bit complex.

In our case, our table consists of a single chunk of data, so the "group" consists of a single physical scan.

The group scan brings together four layers of information:

  • The configuration of the storage plugin (the "storage engine"),
  • The (optional schema) and table,
  • The set of columns.
  • Plugin-specific implementation details for the scan.

That is, the group scan extends the scan spec by providing a list of columns from our query:

SELECT * FROM example.myTable

SELECT a, b, c FROM example.myTable

Drill uses schema-on-read, so we will assume that we can figure out the table names and types at runtime. However, if we know the available columns and types at plan time, we can tell Calcite to use that information. See the existing storage plugins to see how that is done.

We need the group scan class and a variety of constructors:

public class ExampleGroupScan extends BaseGroupScan {

  private final ExampleScanSpec scanSpec;

  // Initial constructor
  public SumoGroupScan(ExampleStoragePlugin storagePlugin, String userName,
      ExampleScanSpec scanSpec) {
    super(storagePlugin, userName, null);
    this.scanSpec = scanSpec;
  }

  // Copy with columns
  public ExampleGroupScan(ExampleGroupScan from, List<SchemaPath> columns) {
    super(from.storagePlugin, from.getUserName(), columns);
    this.scanSpec = that.scanSpec;
  }

  // Deserialization constructor
  @JsonCreator
  public SumoGroupScan(
      @JsonProperty("config") SumoStoragePluginConfig config,
      @JsonProperty("userName") String userName,
      @JsonProperty("columns") List<SchemaPath> columns,
      @JsonProperty("scanSpec") ExampleScanSpec scanSpec,
      @JacksonInject StoragePluginRegistry engineRegistry) {
    super(config, userName, columns, engineRegistry);
    this.scanSpec = scanSpec;
  }

This class is part of the "logical plan" and thus must be Jackson serializable.

The config, userName, columns, and engineRegistry fields are all handled by the base class. scanSpec is a proxy for whatever information you choose to store in your class.

Create the Group Scan with the Scan Factory

The process to create a group scan from a scan spec is a bit complex. The base class tries to simplify this using a "scan factory." Back in the storage plugin class:

public class ExampleStoragePlugin extends BaseStoragePlugin<ExampleStoragePluginConfig> {

  private static class ExampleScanFactory extends
      BaseScanFactory<ExampleStoragePlugin, ExampleScanSpec,
                      ExampleGroupScan, ExampleSubScan> {

    @Override
    public ExampleGroupScan newGroupScan(ExampleStoragePlugin storagePlugin,
        String userName, ExampleScanSpec scanSpec,
        SessionOptionManager sessionOptions,
        MetadataProviderManager metadataProviderManager) {
      return new ExampleGroupScan(storagePlugin, userName, scanSpec);
    }

    @Override
    public ExampleGroupScan groupWithColumns(ExampleGroupScan group,
        List<SchemaPath> columns) {
      return new ExampleGroupScan(group, columns);
    }

    @Override
    public ScanFrameworkBuilder scanBuilder(SumoStoragePlugin storagePlugin,
        OptionManager options, SumoSubScan subScan) {
      return null; // TODO
    }
  }

  private static StoragePluginOptions buildOptions() {
    ...
    options.scanFactory = new SumoScanFactory();
    return options;
  }

The scan factory class provides a type-friendly way to create instances in lieu of the somewhat obscure way other plugins do the same work. Notice the use of two of the group scan constructors just created.

Basic Operations

This class needs a number of methods to help with planning. First, we have to tell Drill how to assign the scan to nodes. By default, we do no assignments and let Drill decide:

  @Override
  public void applyAssignments(List<DrillbitEndpoint> endpoints) { }

Next we tell Drill how much it can parallelize the scan. We assume we are calling an external service, so we allow only one thread of execution per query:

  @Override
  public int getMaxParallelizationWidth() { return 1; }

If you use EVF to implement your reader (and you probably should), then your reader can handle projection:

  @Override
  public boolean canPushdownProjects(List<SchemaPath> columns) {
    return true;
  }

Cost Estimates

Calcite is a cost-based planner: it will use cost information to choose among plans. We must provide a cost estimate, however crude.


  @Override
  public ScanStats getScanStats() {
    // No good estimates at all, just make up something.
    int estRowCount = 10_000;
    int estDataSize = estRowCount * 200;
    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, estDataSize);
  }

If you are reading from a file, you can estimate the data size from file size, and estimate row count by dividing file size by some reasonable number, say 50, 100 or 200, depending on the data. If you are reading a database or some other system, you may be able to get accurate estimates. If reading from, say, a REST service, you may have no data at all and must just make something up.

You want the estimate to at least be small or large: small if you know it is safe for Drill to broadcast data to all nodes (for a hash join, say) and large if Drill should never make copies of the data.

Create the Sub Scan

This class implements the next step in the planning process: creating specific scans, but we'll stub this part out for now:

  @Override
  public SubScan getSpecificScan(int minorFragmentId) {
    // TODO Auto-generated method stub
    return null;
  }

Boilerplate Methods

Finally, we need some boilerplate required by Drill:

  @Override
  public GroupScan clone(List<SchemaPath> columns) {
    return new ExampleGroupScan(this);
  }

  @Override
  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
    Preconditions.checkArgument(children.isEmpty());
    return new ExampleGroupScan(this);
  }

  @Override
  public String getDigest() {
    return toString();
  }

  @Override
  public String toString() {
    return "Example scan of " + scanSpec.getSchemaName() + "." + scanSpec.getTableName();
  }

Create the Group Scan Instance

Add the following method to the plugin class:

  @Override
  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
    ExampleScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<ExampleScanSpec>() { });
    return new ExampleGroupScan(userName, config, scanSpec, null);
  }

This code deserializers a JSON representation of your scan spec. (It is not clear why the scan spec is in JSON at this point.)

Test the Group Scan

We can now again run our test. First set a breakpoint in the getSpecificScan, and run the test. This will verify that things work up to this point.

Specific Scan

The group scan represents the general idea of "scan table X." The specific scan (also called a "sub scan") contains the information needed to implement the physical scan, including Hadoop splits, database shards or whatever is needed by the storage engine. In our case, we assume a single scan operator that just needs the information gathered above.

The specific scan is serialized to JSON and sent to each Drillbit, where it is deserialized and passed to the scan operator.

@JsonTypeName("example-sub-scan")
public class ExampleSubScan extends AbstractSubScan {

  private final ExampleStoragePluginConfig pluginConfig;
  private final ExampleScanSpec tableSpec;
  private final List<SchemaPath> columns;

  public ExampleSubScan(
      @JsonProperty("config") ExampleStoragePluginConfig config,
      @JsonProperty("tableSpec") ExampleScanSpec tableSpec,
      @JsonProperty("columns") List<SchemaPath> columns) {
    super("user-if-needed");
    this.pluginConfig = config;
    this.tableSpec = tableSpec;
    this.columns = columns;
  }

  @Override
  @JsonIgnore
  public int getOperatorType() {
    return CoreOperatorType.EXAMPLE_SUB_SCAN_VALUE;
  }

  public ExampleStoragePluginConfig getConfig() {
    return pluginConfig;
  }

  public ExampleScanSpec getTableSpec() {
    return tableSpec;
  }

  public List<SchemaPath> getColumns() {
    return columns;
  }
}

Notice that we have included the storage plugin config, not the storage plugin itself. The config is Jackson-serializable, the plugin is not. We now see why we made the scan spec serializable, it will be included in our sub scan.

Operator Type

Notice above that we referenced CoreOperatorType.EXAMPLE_SUB_SCAN_VALUE. This is a unique numeric identifier for each operator in Drill. Unfortunately, all operators must be defined in a single global class, which makes it hard to add a true plugin not known to Drill itself. Until this is resolved, you must modify UserBitShared.proto to add your operator type:

enum CoreOperatorType {
  SINGLE_SENDER = 0;
  ...
  SHP_SUB_SCAN = 65;
  EXAMPLE_SUB_SCAN = 66;
}

Choose the next available number for your ID.

Then, rebuild the Protobuf files as described in protocol/readme.txt.

Create the Sub Scan Instance

We now connect up our sub scan with the group scan:

public class ExampleGroupScan extends AbstractGroupScan {
  ...

  @Override
  public SubScan getSpecificScan(int minorFragmentId) {
    return new ExampleSubScan(config, scanSpec, columns);
  }

Test the Sub Scan

Set a breakpoint in the above method and run the test case. Execution should stop there and you can verify that your sub scan is created as you expect.

With this, we are done with the planner side of the project and are ready to move onto the execution side.

Clone this wiki locally