-
Notifications
You must be signed in to change notification settings - Fork 981
Working with VARCHAR Data in UDFs
As described in the previous section, most Drill types provide a holder that contains all the data you need. But, the variable-width types (VARCHAR
and VARBINARY
) are more complex as they provide direct access to the underlying data stored in direct memory. This page explains how to work effectively with these types.
One thing to keep in mind: direct memory is exactly that: memory outside of the Java heap. Java can provide no protection. If your code writes to the wrong memory location, you can crash the Drillbit process with no logging to help you figure out your error.
Drill stores VARCHAR
as bytes encoded in UTF-8. To properly handle such data, you must write code that is aware of the UTF-8 encoding. Doing so is quite complex, so one choice is to convert the UTF-8 data to Java String
, then convert the result back to UTF-8. This conversion does, however, consume time. So, Drill often takes a shortcut: it works with the single-byte subset of UTF-8 (also known as ASCII.) While this can produce incorrect results with non-ASCII data, it seems to work well for many cases. (Yes, this does seem more like a bug than a feature...)
So, before you begin writing code, consider if your input data will always be ASCII. If so, you can work with the UTF-8 data a buffer of ASCII characters. But, if your data includes non-ASCII characters (or you wish to contribute your code to the Apache Drill project), you must either handled multi-byte characters, or do the conversion to String
.
The VarCharHolder
provides a view into the underlying VarCharVector
. Before we look at the holder, let's understand the vector.
The VarCharVector
is comprised of two parts, as described in the Drill documentation:
- A data vector which is a large buffer that contains all the column values concatenated together.
- An offset vector which provides the start and end position of the value for each row.
So, to work with the VARCHAR
value for a specific row, we need:
- The buffer that holds all the values
- The start position of the current value
- The end position of the current value (from which we can compute the length as
start - end
)
Unsurprisingly, the above is exactly what the VarCharHolder
provides:
public final class VarCharHolder implements ValueHolder{
public int start;
public int end;
public DrillBuf buffer;
The first two fields make sense, but what is a DrillBuf
? Here is where Drill assumes that UDF writers are already familiar with Drill internals. The DrillBuf
is an interface to a direct memory buffer. If you have downloaded Drill sources, and have set up a development environment, you can take a look at the DrillBuf
source code since Drill provides no Javadoc for any of its APIs.
For our purposes, we will need the following methods when working with parameters:
public byte getByte(int index);
public ByteBuf getBytes(int index, byte[] dst);
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length);
The first methods allows us to work byte-by-byte. The second allows us to work with the entire value, such as when we wish to convert to strings (the length of the dst
buffer says how many bytes to copy.) The last version is general purpose for when we need slightly more control.
Let's write two functions: one which counts letters using the single-byte subset of UTF-8 (AKA ASCII), the other which uses Unicode. These illustrate the two ways to work with VARCHAR data, and the pitfalls of working only with ASCII.
Here is the ASCII (byte-oriented) version:
public class CountLettersFunctions {
@FunctionTemplate(
name = "countLettersAscii",
scope = FunctionScope.SIMPLE,
nulls = NullHandling.NULL_IF_NULL)
public static class CountLettersAsciiFunction implements DrillSimpleFunc {
@Param VarCharHolder input;
@Output IntHolder output;
@Override
public void setup() { }
@Override
public void eval() {
int len = input.end - input.start;
output.value = 0;
for (int i = 0; i < len; i++) {
int c = (input.buffer.getByte(input.start + i) & 0xFF);
if (Character.isAlphabetic(c)) {
output.value++;
}
}
}
}
}
The code above AND's (&& operator) the return value of getByte()
with 0xFF
. Why? getByte()
returns a signed 1-byte value. For values with the high-bit set, the value is sign-extended to the int
field, causing negative numbers. The AND ensures that the returned values are in the range 0-255.
Note that we are using a handy trick: defining more than one function in a single Java file. Drill uses this technique frequently.
Let's try some examples. Let's create letterInput.json
:
{ word: "aA 12 zZ!" }
{ word: "Αθήνα?" }
{ word: "-Mосква-" }
{ word: "海." }
Then run a query and look at the results:
SELECT word, countLettersAscii(word) FROM `cp`.`letterInput.json`
aA 12 zZ!,4
Αθήνα?,5
-Mосква-,7
海.,2
The first line is fine: 4 letters. The Greek is good (second line). But, the Russian (third line) is wrong: 7 characters instead of 6. And the Chinese (last line) reports 2 letters instead of 1. What is wrong?
The UTF-8 encoding of some of the characters is multi-byte and is overlapping ASCII letter range which is all our function can handle. (Another error that could occur, not shown here, is that a character is not counted if it falls outside the ASCII letter range.)
Clearly, in this case, we have to become UTF-8 experts. Or, we can pay the cost to convert UTF-8 to a Java String
. Lets do that next.
To convert an array of UTF-8 bytes to a string, we write:
byte bytes[] = ...
String str = new String(bytes, com.google.common.base.Charsets.UTF_8);
You must provide the character set argument. Otherwise, Java will use the default character set for whatever machine is running Drill, which can produce inconsistent results.
Similarly, to convert from a String
to an array of bytes:
byte bytes[] = String.getBytes(com.google.common.base.Charsets.UTF_8);
Drill provides a number of helper functions to work with VARCHAR
columns, defined in the org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers
class. (Remember to use the fully-qualified name when referencing these methods.)
public static String getStringFromVarCharHolder(VarCharHolder varCharHolder);
public static String getStringFromVarCharHolder(NullableVarCharHolder varCharHolder);
public static String toStringFromUTF8(int start, int end, DrillBuf buffer);
Note that the functions only convert from VARCHAR
to String
. We have to roll our own for the reverse conversion.
Here is the Unicode version (in the same Java file) in which we let Java convert from UTF-8 to UTF-16 (used by the Java String
class):
@FunctionTemplate(
name = "countLetters",
scope = FunctionScope.SIMPLE,
nulls = NullHandling.NULL_IF_NULL)
public static class CountLettersFunction implements DrillSimpleFunc {
@Param VarCharHolder input;
@Output IntHolder output;
@Override
public void setup() { }
@Override
public void eval() {
String value = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(input);
output.value = 0;
for (int i = 0; i < value.length(); i++) {
if (Character.isAlphabetic(value.charAt(i))) {
output.value++;
}
}
}
}
Note that in the loop over the characters, we use the length of the String, not the buffer, since, in the UTF-8 multi-byte encoding, the numbers are different.
Let's try the query again:
aA 12 zZ!,4
Αθήνα?,5
-Mосква-,6
海.,1
Bingo! As we can see, we must trade off speed for accuracy. We must pay the cost to convert to Unicode in order to perform operations that depend on the Unicode encoding.
We've also seen how to work with the DrillBuf
provided in the VarCharHolder
class.
Returning a VARCHAR
is a bit more complex. When working with parameters, Drill has already allocated the space for the incoming values, we simply work with that existing buffer. But for the output, we must create a buffer. For obscure reasons, Drill insists we put the return value in direct memory.
We don't write the value directly to the output vector; instead we write it to a temporary buffer which is copied to the output. (Because of this copy, it is not clear that there is much benefit creating a direct memory buffer. Either we should write to the output vector, or use a heap buffer for a temporary value. Sigh... We just have to work with the code the way it exists, not the way we'd like it to be.)
Suppose we want to create a function that returns the set of letter-like characters. We declare the output type as VARCHAR
as follows:
@FunctionTemplate(
name = "extractLetters",
scope = FunctionScope.SIMPLE,
nulls = NullHandling.NULL_IF_NULL)
public static class ExtractLettersFunction implements DrillSimpleFunc {
@Param VarCharHolder input;
@Output VarCharHolder output;
For reasons known only to Drill, we must tell Drill twice that we are returning a VARCHAR
: once in the @Output
annotation and a second time using the @Inject
annotation. Then, we must glue the two together in our code. (Since there is only one way to do this correctly, one wonders why Drill doesn't just do it for us...)
public static class ExtractLettersFunction implements DrillSimpleFunc {
...
@Inject DrillBuf outputBuf;
The injected buffer is initialized with a capacity of 256 bytes. If we need more, we must replace it with a larger buffer. The method for doing so is:
public DrillBuf reallocIfNeeded(final int size);
The buffer is reallocated to the new size. (Actually, it is allocated to the new size, rounded up to the next power of 2.) The return value is a new buffer that we use to replace the old one. Because of the way Drill works, both our old and new buffers are allocated until the query exits.
If we are curious about the buffer size, we can call:
public int capacity();
The buffer is injected once, then reused for each function call. The buffer is a wrapper around direct memory. Our code must allocate the required memory (as shown below.) But, since the buffer is reused, once our function runs for a while, we may be able to reuse the previous buffer, if it is long enough.
But, because we reuse the same buffer, and Drill will not release buffers until the query exits, we can end up wasting quite a bit of space if we constantly grow our buffer from 256 to 512, 1024 bytes and so on. There is nothing we can do to fix this; we can just be aware of the issue.
Drill uses Netty as its memory allocator. Because of the way Netty works, code should avoid allocating buffers larger than 16 MB. Such a large size is unlikely for VARCHAR
columns, but possible for VARBINARY
columns if you happen to be working with images, audio or other binary blob types.
We will need the following ByteBuf
methods:
public ByteBuf setByte(int index, int value);
public void setByte(int index, byte b);
public ByteBuf setBytes(int index, byte[] src);
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length);
The ByteBuf
return values are an artifact of the fact that DrillBuf
extends Netty's ByteBuf
class. Normally we don't use these values.
Let's put all this together to write a function that returns the set of letter-like characters rather than just counting them.
@FunctionTemplate(
name = "extractLetters",
scope = FunctionScope.SIMPLE,
nulls = NullHandling.NULL_IF_NULL)
public static class ExtractLettersFunction implements DrillSimpleFunc {
@Param VarCharHolder input;
@Output VarCharHolder output;
@Inject DrillBuf outputBuf;
@Override
public void setup() { }
@Override
public void eval() {
// Convert the input to a string.
String value = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(input);
// Build up the output string.
StringBuilder result = new StringBuilder();
for (int i = 0; i < value.length(); i++) {
char c = value.charAt(i);
if (Character.isAlphabetic(c)) {
result.append(c);
}
}
// Convert to the output buffer.
byte outBytes[] = result.toString().getBytes();
outputBuf = outputBuf.reallocIfNeeded(outBytes.length);
outputBuf.setBytes(0, outBytes);
output.buffer = outputBuf;
output.start = 0;
output.end = outBytes.length;
}
}
Let's focus on the eval()
method, as the rest has already been covered. eval()
divides into three blocks. The first we've already discussed, the second is just plain Java code. The third is where we must focus. We start by converting our output String to an array of UTF-8 bytes:
byte outBytes[] = result.toString().getBytes();
Next, we reallocate our working buffer to hold our data. The buffer is reused, so we need to reallocate memory only if our needs are larger than the current size:
outputBuf = outputBuf.reallocIfNeeded(outBytes.length);
Note that reallocIfNeeded()
returns a new buffer if reallocation occurs. If we forget to assign it back to our outputBuf
, we'll continue working the the old buffer, and we may cause the query to fail with an IndexOutOfBounds
exception.
As it turns out, Drill's generated code is designed to handle the case in which we replace the buffer. Here is an abstract of the generated code:
11: public class ProjectorGen0 {
12:
// The injected buffer
13: DrillBuf work0;
...
37: final NullableVarCharHolder output = new NullableVarCharHolder();
38: NullableVarCharHolder input = out4;
// Give the buffer to our function
39: DrillBuf outputBuf = work0;
40:
// The inlined body of our function
41: CountLettersFunctions$ExtractLettersFunction_eval: {
42: String value = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(input);
...
// We replace the buffer
56: outputBuf = outputBuf.reallocIfNeeded(outBytes.length);
...
61: }
62:
// Drill copies our newly-created buffer
// into its own holding variable.
63: work0 = outputBuf;
With the buffer allocated, we can copy our UTF-8 bytes into the buffer:
outputBuf.setBytes(0, outBytes);
Finally, we tell the output VarCharHolder
where to find its buffer, and where to find the data within the buffer:
output.buffer = outputBuf;
output.start = 0;
output.end = outBytes.length;
Note the use of outBytes.length
above. We want to set the length to the actual byte count. As it turns out, ByteBuf
can also tell us its length via the capacity()
method, but this will tell us the buffer length which is likely to be greater than the number of bytes.
Let's try the function on the same input data as before:
SELECT word, extractLetters(word) FROM `cp`.`letterInput.json`
aA 12 zZ!,aAzZ
Αθήνα?,Αθήνα
-Mосква-,Mосква
海.,海
Looks like everything works.
The above description does things using just Drill-provided methods and Java. The GitHub repo for this article contains a UdfUtils
class that handles some of the routine work.
public static String varCharInput(VarCharHolder input);
Converts a non-nullable VarChar to a Java String. Same as StringFunctionHelpers.getStringFromVarCharHolder()
.
public static String varCharInput(NullableVarCharHolder input);
Converts a nullable VarChar to a Java String. Same as StringFunctionHelpers.getStringFromVarCharHolder()
, except that if the input VarChar is NULL
, the returned Java String
is null
.
public static DrillBuf varCharOutput(String result,
DrillBuf outputBuf, VarCharHolder output);
Does the work of converting a Java String
to a non-nullable VarChar. Resizes the output buffer if needed. Assign the return value to your output buffer.
public static DrillBuf varCharOutput(String result,
DrillBuf outputBuf, NullableVarCharHolder output);
As above, but handles nullable VarChars. If the input Java String
is null
, sets the output holder to null, else works the same as the non-null version.
Use of the above functions is not free; they will prevent Drill from doing "scalar replacement" to eliminate the input or output VarCharHolder, causing a slight performance hit. (The same is true of the Drill-provided functions.) Still, the gain in developer convenience may be worth the runtime performance cost in some cases.
Here is the example extractLetters
function rewritten to use the non-nullable convenience functions:
@Override
public void eval() {
// Convert the input to a string.
String value = org.apache.drill.exec.expr.contrib.udfExample.UdfUtils.varCharInput(input);
...
// Convert to the output buffer.
outputBuf = org.apache.drill.exec.expr.contrib.udfExample.UdfUtils.varCharOutput(
result, outputBuf, output);
}
Everything shown above for VARCHAR
works for VARBINARY
as well. In fact, if we know that the VARBINARY
holds a UTF-8-encoded text field (as can be the case for HBase), we can even convert the binary data to a Java String
. The only difference is that Java provides no helper function, so we have to do the work ourselves:
@Param VarBinaryHolder input;
...
int len = input.end - input.start;
byte buf[] = new byte[len];
input.buffer.getBytes(input.start, buf);
String value = new String(buf, com.google.common.base.Charsets.UTF_8);
The UdfUtils
class also provides convenience functions for the case that a VARBINARY
holds a string:
public static String varBinaryToString(NullableVarBinaryHolder input);
Testing UDFs is a chore if done directly with the Drill server. We must build the code, copy it to Drill, restart Drill, and run a query. Much simpler if we can use JUnit to do the detailed testing, then just use Drill for integration testing.
Let's show how to use the Drill test framework to test our UDF that uses VARCHAR
parameters or return values. Such testing is a bit more complex than the earlier case with simple types.
First, let's define a dupIt
function that duplicates a string n
times:
@FunctionTemplate(
name = "dupit",
scope = FunctionScope.SIMPLE,
nulls = NullHandling.NULL_IF_NULL)
// VARCHAR-REQUIRED dupit(VARCHAR-REQUIRED, INT-REQUIRED)
public static class DupItFunction implements DrillSimpleFunc {
@Param VarCharHolder input;
@Param IntHolder count;
@Output VarCharHolder output;
@Inject DrillBuf outputBuf;
@Override
public void setup() { }
@Override
public void eval() {
// Convert the input to a string.
String value = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(input);
// Build up the output string.
StringBuilder result = new StringBuilder();
for (int i = 0; i < count.value; i++) {
result.append(value);
}
// Convert to the output buffer.
byte outBytes[] = result.toString().getBytes(com.google.common.base.Charsets.UTF_8);
outputBuf = outputBuf.reallocIfNeeded(outBytes.length);
outputBuf.setBytes(0, outBytes);
output.buffer = outputBuf;
output.start = 0;
output.end = outBytes.length;
}
}
We need to write some helper functions that simulate how Drill works with our function. First, let's define the test function using a special OperatorFixture
used in Drill unit tests:
@Test
public void testDupItFn() throws Exception {
try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
OperatorContext context = fixture.operatorContext(null);
try {
// Test code goes here
} finally {
context.close();
}
}
}
The fixture
, along with the OperatorContext
allow us to create a managed buffer one that Drill will track for us. (The operator fixture is available in Drill 1.12 and later.) Let's use that feature to create an instance of our function and wire it up to its holders and to an input and output buffer:
private DupItFunction newDupIt(OperatorContext context) {
DupItFunction fn = new DupItFunction();
fn.input = new VarCharHolder();
// Larger than our largest output
fn.input.buffer = context.getManagedBuffer(1024);
fn.count = new IntHolder();
fn.output = new VarCharHolder();
// Default size, function should realloc
fn.outputBuf = context.getManagedBuffer();
fn.setup();
return fn;
}
Next, we need a function to call our function, complete with String-to-buffer-to-String conversions:
private String callDupIt(DupItFunction fn, String value, int count) {
fn.count.value = count;
byte byteVal[] = value.getBytes(Charsets.UTF_8);
fn.input.start = 0;
fn.input.end = byteVal.length;
fn.input.buffer.setBytes(0, byteVal);
fn.eval();
return StringFunctionHelpers.getStringFromVarCharHolder(fn.output);
}
Given those, we can now easily write test cases:
DupItFunction fn = newDupIt(context);
assertEquals("", callDupIt(fn, "x", 0));
assertEquals("xyxyxyxy", callDupIt(fn, "xy", 4));
String result = callDupIt(fn, "0123456789", 1000);
assertEquals(10_000, result.length());
The last case above verified that we are properly resizing our working buffer when needed.
And there we have it. For the price of a bit of boilerplate code, we can debug our UDF in our IDE without pulling our hair out trying to debug generated code in a Drill server.