Skip to content

Commit

Permalink
Flink: backport PR apache#10331 and PR apache#10457 (apache#10757)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzwu authored Jul 26, 2024
1 parent ec2c2e9 commit 4dbc7f5
Show file tree
Hide file tree
Showing 90 changed files with 8,396 additions and 3,298 deletions.
2 changes: 2 additions & 0 deletions flink/v1.17/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.slf4j'
}

implementation libs.datasketches

testImplementation libs.flink117.connector.test.utils
testImplementation libs.flink117.core
testImplementation libs.flink117.runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.sink.shuffle;

import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -28,6 +29,8 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -67,6 +70,8 @@ public class MapRangePartitionerBenchmark {
Types.NestedField.required(9, "name9", Types.StringType.get()));

private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();
private static final Comparator<StructLike> SORT_ORDER_COMPARTOR =
SortOrderComparators.forSchema(SCHEMA, SORT_ORDER);
private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER);

private MapRangePartitioner partitioner;
Expand All @@ -83,10 +88,11 @@ public void setupBenchmark() {
mapStatistics.put(sortKey, weight);
});

MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics);
MapAssignment mapAssignment =
MapAssignment.fromKeyFrequency(2, mapStatistics, 0.0, SORT_ORDER_COMPARTOR);
this.partitioner =
new MapRangePartitioner(
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2);
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), mapAssignment);

List<Integer> keys = Lists.newArrayList(weights.keySet().iterator());
long[] weightsCDF = new long[keys.size()];
Expand Down

This file was deleted.

Loading

0 comments on commit 4dbc7f5

Please sign in to comment.