Skip to content

Commit

Permalink
Merge branch 'op_manifest' of github.com:ranxianglei/paimon into op_m…
Browse files Browse the repository at this point in the history
…anifest
  • Loading branch information
ranxianglei.rxl committed Nov 12, 2024
2 parents 8c9a75c + 4364ac1 commit dfaeac3
Show file tree
Hide file tree
Showing 68 changed files with 3,067 additions and 203 deletions.
68 changes: 67 additions & 1 deletion docs/content/concepts/table-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ Paimon supports table types:
3. view: metastore required, views in SQL are a kind of virtual table
4. format-table: file format table refers to a directory that contains multiple files of the same format, where
operations on this table allow for reading or writing to these files, compatible with Hive tables
5. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development
5. object table: provides metadata indexes for unstructured data objects in the specified Object Storage storage directory.
6. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development
experience, see [Flink Materialized Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/)

## Table with PK
Expand Down Expand Up @@ -169,6 +170,71 @@ CREATE TABLE my_parquet_table (

{{< /tabs >}}

## Object Table

Object Table provides metadata indexes for unstructured data objects in the specified Object Storage storage directory.
Object tables allow users to analyze unstructured data in Object Storage:

1. Use Python API to manipulate these unstructured data, such as converting images to PDF format.
2. Model functions can also be used to perform inference, and then the results of these operations can be concatenated
with other structured data in the Catalog.

The object table is managed by Catalog and can also have access permissions and the ability to manage blood relations.

{{< tabs "object-table" >}}

{{< tab "Flink-SQL" >}}

```sql
-- Create Object Table

CREATE TABLE `my_object_table` WITH (
'type' = 'object-table',
'object-location' = 'oss://my_bucket/my_location'
);

-- Refresh Object Table

CALL sys.refresh_object_table('mydb.my_object_table');

-- Query Object Table

SELECT * FROM `my_object_table`;

-- Query Object Table with Time Travel

SELECT * FROM `my_object_table` /*+ OPTIONS('scan.snapshot-id' = '1') */;
```

{{< /tab >}}

{{< tab "Spark-SQL" >}}

```sql
-- Create Object Table

CREATE TABLE `my_object_table` TBLPROPERTIES (
'type' = 'object-table',
'object-location' = 'oss://my_bucket/my_location'
);

-- Refresh Object Table

CALL sys.refresh_object_table('mydb.my_object_table');

-- Query Object Table

SELECT * FROM `my_object_table`;

-- Query Object Table with Time Travel

SELECT * FROM `my_object_table` VERSION AS OF 1;
```

{{< /tab >}}

{{< /tabs >}}

## Materialized Table

Materialized Table aimed at simplifying both batch and stream data pipelines, providing a consistent development
Expand Down
14 changes: 13 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@
<td>Double</td>
<td>Define the default false positive probability for lookup cache bloom filters.</td>
</tr>
<tr>
<td><h5>lookup.cache.high-priority-pool-ratio</h5></td>
<td style="word-wrap: break-word;">0.25</td>
<td>Double</td>
<td>The fraction of cache memory that is reserved for high-priority data like index, filter.</td>
</tr>
<tr>
<td><h5>lookup.hash-load-factor</h5></td>
<td style="word-wrap: break-word;">0.75</td>
Expand Down Expand Up @@ -527,6 +533,12 @@
<td>Integer</td>
<td>The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
<tr>
<td><h5>object-location</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The object location for object table.</td>
</tr>
<tr>
<td><h5>page-size</h5></td>
<td style="word-wrap: break-word;">64 kb</td>
Expand Down Expand Up @@ -904,7 +916,7 @@
<td><h5>type</h5></td>
<td style="word-wrap: break-word;">table</td>
<td><p>Enum</p></td>
<td>Type of the table.<br /><br />Possible values:<ul><li>"table": Normal Paimon table.</li><li>"format-table": A file format table refers to a directory that contains multiple files of the same format.</li><li>"materialized-table": A materialized table.</li></ul></td>
<td>Type of the table.<br /><br />Possible values:<ul><li>"table": Normal Paimon table.</li><li>"format-table": A file format table refers to a directory that contains multiple files of the same format.</li><li>"materialized-table": A materialized table combines normal Paimon table and materialized SQL.</li><li>"object-table": A object table combines normal Paimon table and object location.</li></ul></td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package org.apache.paimon.benchmark.metric.cpu;

import java.io.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
benchmark.addCase(
String.format("cache-%s", cacheType.toString()),
5,
Expand Down
26 changes: 26 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,13 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.parse("256 mb"))
.withDescription("Max memory size for lookup cache.");

public static final ConfigOption<Double> LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO =
key("lookup.cache.high-priority-pool-ratio")
.doubleType()
.defaultValue(0.25)
.withDescription(
"The fraction of cache memory that is reserved for high-priority data like index, filter.");

public static final ConfigOption<Boolean> LOOKUP_CACHE_BLOOM_FILTER_ENABLED =
key("lookup.cache.bloom.filter.enabled")
.booleanType()
Expand Down Expand Up @@ -1405,6 +1412,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to enable asynchronous IO writing when writing files.");

public static final ConfigOption<String> OBJECT_LOCATION =
key("object-location")
.stringType()
.noDefaultValue()
.withDescription("The object location for object table.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
Expand Down Expand Up @@ -1516,6 +1529,10 @@ public static Path path(Options options) {
return new Path(options.get(PATH));
}

public TableType type() {
return options.get(TYPE);
}

public String formatType() {
return normalizeFileFormat(options.get(FILE_FORMAT));
}
Expand Down Expand Up @@ -1565,6 +1582,11 @@ public static FileFormat createFileFormat(Options options, ConfigOption<String>
return FileFormat.fromIdentifier(formatIdentifier, options);
}

public String objectLocation() {
checkArgument(type() == TableType.OBJECT_TABLE, "Only object table has object location!");
return options.get(OBJECT_LOCATION);
}

public Map<Integer, String> fileCompressionPerLevel() {
Map<String, String> levelCompressions = options.get(FILE_COMPRESSION_PER_LEVEL);
return levelCompressions.entrySet().stream()
Expand Down Expand Up @@ -1837,6 +1859,10 @@ public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}

public double lookupCacheHighPrioPoolRatio() {
return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
}

public long targetFileSize(boolean hasPrimaryKey) {
return options.getOptional(TARGET_FILE_SIZE)
.orElse(hasPrimaryKey ? VALUE_128_MB : VALUE_256_MB)
Expand Down
7 changes: 6 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/TableType.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public enum TableType implements DescribedEnum {
FORMAT_TABLE(
"format-table",
"A file format table refers to a directory that contains multiple files of the same format."),
MATERIALIZED_TABLE("materialized-table", "A materialized table.");
MATERIALIZED_TABLE(
"materialized-table",
"A materialized table combines normal Paimon table and materialized SQL."),
OBJECT_TABLE(
"object-table", "A object table combines normal Paimon table and object location.");

private final String value;
private final String description;

Expand Down
32 changes: 32 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.paimon.annotation.Public;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.TimestampType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -442,4 +446,32 @@ public static BinaryRow singleColumn(@Nullable BinaryString string) {
writer.complete();
return row;
}

/**
* If it is a fixed-length field, we can call this BinaryRowData's setXX method for in-place
* updates. If it is variable-length field, can't use this method, because the underlying data
* is stored continuously.
*/
public static boolean isInFixedLengthPart(DataType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case BIGINT:
case FLOAT:
case DOUBLE:
return true;
case DECIMAL:
return Decimal.isCompact(((DecimalType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return Timestamp.isCompact(((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return Timestamp.isCompact(((LocalZonedTimestampType) type).getPrecision());
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,98 @@ interface FieldGetter extends Serializable {
@Nullable
Object getFieldOrNull(InternalRow row);
}

/**
* Creates a {@link FieldSetter} for setting elements to a row from a row at the given position.
*
* @param fieldType the element type of the row
* @param fieldPos the element position of the row
*/
static FieldSetter createFieldSetter(DataType fieldType, int fieldPos) {
final FieldSetter fieldSetter;
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
fieldSetter = (from, to) -> to.setBoolean(fieldPos, from.getBoolean(fieldPos));
break;
case DECIMAL:
final int decimalPrecision = getPrecision(fieldType);
final int decimalScale = getScale(fieldType);
fieldSetter =
(from, to) ->
to.setDecimal(
fieldPos,
from.getDecimal(fieldPos, decimalPrecision, decimalScale),
decimalPrecision);
if (fieldType.isNullable() && !Decimal.isCompact(decimalPrecision)) {
return (from, to) -> {
if (from.isNullAt(fieldPos)) {
to.setNullAt(fieldPos);
to.setDecimal(fieldPos, null, decimalPrecision);
} else {
fieldSetter.setFieldFrom(from, to);
}
};
}
break;
case TINYINT:
fieldSetter = (from, to) -> to.setByte(fieldPos, from.getByte(fieldPos));
break;
case SMALLINT:
fieldSetter = (from, to) -> to.setShort(fieldPos, from.getShort(fieldPos));
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
fieldSetter = (from, to) -> to.setInt(fieldPos, from.getInt(fieldPos));
break;
case BIGINT:
fieldSetter = (from, to) -> to.setLong(fieldPos, from.getLong(fieldPos));
break;
case FLOAT:
fieldSetter = (from, to) -> to.setFloat(fieldPos, from.getFloat(fieldPos));
break;
case DOUBLE:
fieldSetter = (from, to) -> to.setDouble(fieldPos, from.getDouble(fieldPos));
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(fieldType);
fieldSetter =
(from, to) ->
to.setTimestamp(
fieldPos,
from.getTimestamp(fieldPos, timestampPrecision),
timestampPrecision);
if (fieldType.isNullable() && !Timestamp.isCompact(timestampPrecision)) {
return (from, to) -> {
if (from.isNullAt(fieldPos)) {
to.setNullAt(fieldPos);
to.setTimestamp(fieldPos, null, timestampPrecision);
} else {
fieldSetter.setFieldFrom(from, to);
}
};
}
break;
default:
throw new IllegalArgumentException(
String.format("type %s not support for setting", fieldType));
}
if (!fieldType.isNullable()) {
return fieldSetter;
}
return (from, to) -> {
if (from.isNullAt(fieldPos)) {
to.setNullAt(fieldPos);
} else {
fieldSetter.setFieldFrom(from, to);
}
};
}

/** Accessor for setting the field of a row during runtime. */
interface FieldSetter extends Serializable {
void setFieldFrom(DataGetters from, DataSetters to);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public BinaryRow toBinaryRow(BinaryRow rowData) throws IOException {

// ============================ Page related operations ===================================

@Override
public BinaryRow createReuseInstance() {
return new BinaryRow(numFields);
}

@Override
public int serializeToPages(BinaryRow record, AbstractPagedOutputView headerLessView)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public BinaryRow toBinaryRow(InternalRow row) {
return reuseRow;
}

@Override
public InternalRow createReuseInstance() {
return binarySerializer.createReuseInstance();
}

@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
/** A type serializer which provides paged serialize and deserialize methods. */
public interface PagedTypeSerializer<T> extends Serializer<T> {

/** Creates a new instance for reusing. */
T createReuseInstance();

/**
* Serializes the given record to the given target paged output view. Some implementations may
* skip some bytes if current page does not have enough space left, .e.g {@link BinaryRow}.
Expand Down
Loading

0 comments on commit dfaeac3

Please sign in to comment.