Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK] [#2331] Add support to partition table by date type #3533

Merged

Conversation

tuliocavalcanti
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Adds support for table partition of DATE type for RowData elements.
Resolves #2331

How was this patch tested?

Tests were updated to include the new type.

Does this PR introduce any user-facing changes?

No

Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Can you please add a simple test that writes multiple different date values?

If you need help or don't know how -- no worries, just let me know and I can take care of it!

@scottsand-db
Copy link
Collaborator

Hi @tuliocavalcanti - friendly follow up?

@tuliocavalcanti
Copy link
Contributor Author

Hi Scott,
sorry for the delay, had a tight deadline to deliver the week.

could you please provide some date examples that you want tested?
I didn't do many testing because I'm using Flink's internal date converter DateDateConverter and expect it to be properly tested.

@scottsand-db
Copy link
Collaborator

Hi @tuliocavalcanti - I'd like an end to end test case that streams and writes different date values to a table. e.g. something as simple as start with 2024-09-01 and write values until 2024-10-01.

@tuliocavalcanti
Copy link
Contributor Author

Sure thing, is there a another test case that I can use as guide to do so?
I'm not sure where to put this test, I'm assuming its not in this TestDeltaBucketAssigner file

@scottsand-db
Copy link
Collaborator

@tuliocavalcanti something like this?

package io.delta.flink.sink;

import io.delta.flink.utils.DeltaTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Path;
import java.time.LocalDate;
import java.util.Arrays;

public class DeltaSinkSuite {

    @Test
    public void testWritePartitionedByDate(@TempDir Path tempDir) throws Exception {
        final String deltaTablePath = tempDir.toString();

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final RowType rowType = new RowType(Arrays.asList(
            new RowType.RowField("part1", new DateType()),
            new RowType.RowField("data1", new IntType())
        ));

        final DataFormatConverters.DataFormatConverter<RowData, Row> typeConverter =
            DataFormatConverters.getConverterForDataType(
                TypeConversions.fromLogicalToDataType(rowType)
            );

        final DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                new org.apache.flink.core.fs.Path(deltaTablePath),
                DeltaTestUtils.getHadoopConf(),
                rowType
            ).build();

        final DataStream<RowData> inputStream = env.fromElements(
            typeConverter.toInternal(Row.of(LocalDate.of(2024, 1, 1), 0))
        );

        inputStream.sinkTo(deltaSink);

        env.execute("Delta Sink Example");
    }
}

@tuliocavalcanti
Copy link
Contributor Author

Hi Scott, just double checking, you folks will merge it when its time, right?

@scottsand-db scottsand-db merged commit 6c22aca into delta-io:master Sep 10, 2024
17 checks passed
@scottsand-db scottsand-db added this to the 3.3.0 milestone Sep 10, 2024
@tuliocavalcanti tuliocavalcanti deleted the flink/support-date-partition branch September 10, 2024 17:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG][Flink] Caused by: java.lang.RuntimeException: Type not supported DATE
5 participants