Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Aug 9, 2024
1 parent 192e0ee commit 2edba9e
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

public interface SeaTunnelRowDeserializer<Input> {

void deserialize(Input record, Collector<SeaTunnelRow> output);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.tikv.kvproto.Kvrpcpb;

/** Deserialize snapshot data */
public class SeaTunnelRowSnapshotRecordDeserializer
implements SeaTunnelRowDeserializer<Kvrpcpb.KvPair> {
@Override
public void deserialize(Kvrpcpb.KvPair record, Collector<SeaTunnelRow> output) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.tikv.kvproto.Cdcpb;

public class SeaTunnelRowStreamingRecordDeserializer
implements SeaTunnelRowDeserializer<Cdcpb.Event.Row> {

@Override
public void deserialize(Cdcpb.Event.Row record, Collector<SeaTunnelRow> output) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.config.TiDBSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowSnapshotRecordDeserializer;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.deserializer.SeaTunnelRowStreamingRecordDeserializer;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.split.TiDBSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.cdc.tidb.source.utils.TableKeyRangeUtils;

Expand Down Expand Up @@ -62,6 +64,9 @@ public class TiDBSourceReader implements SourceReader<SeaTunnelRow, TiDBSourceSp
private final Map<TiDBSourceSplit, Long> resolvedTsStates;
private final Map<TiDBSourceSplit, ByteString> snapshotOffsets;

private final SeaTunnelRowSnapshotRecordDeserializer snapshotRecordDeserializer;
private final SeaTunnelRowStreamingRecordDeserializer streamingRecordDeserializer;

/** Task local variables. */
private transient TiSession session = null;

Expand All @@ -85,6 +90,9 @@ public TiDBSourceReader(Context context, TiDBSourceConfig config) {
// use queue to separate read and write to ensure pull event unblock.
// since sink jdbc is slow, 5000W queue size may be safe size.
this.committedEvents = new LinkedBlockingQueue<>();

this.snapshotRecordDeserializer = new SeaTunnelRowSnapshotRecordDeserializer();
this.streamingRecordDeserializer = new SeaTunnelRowStreamingRecordDeserializer();
}

/** Open the source reader. */
Expand Down Expand Up @@ -119,18 +127,19 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
if (config.getStartupMode() == StartupMode.INITIAL) {
for (TiDBSourceSplit sourceSplit : sourceSplits) {
if (!sourceSplit.isSnapshotCompleted()) {
snapshotEvents(sourceSplit);
snapshotEvents(sourceSplit, output);
}
// completed snapshot
snapshotCompleted.add(sourceSplit);
}
}
for (TiDBSourceSplit sourceSplit : sourceSplits) {
captureStreamingEvents(sourceSplit);
captureStreamingEvents(sourceSplit, output);
}
}

protected void snapshotEvents(TiDBSourceSplit split) throws Exception {
protected void snapshotEvents(TiDBSourceSplit split, Collector<SeaTunnelRow> output)
throws Exception {
log.info("read snapshot events");
Coprocessor.KeyRange keyRange = split.getKeyRange();
long resolvedTs = split.getResolvedTs();
Expand All @@ -147,10 +156,7 @@ protected void snapshotEvents(TiDBSourceSplit split) throws Exception {
}
for (final Kvrpcpb.KvPair pair : segment) {
if (TableKeyRangeUtils.isRecordKey(pair.getKey().toByteArray())) {
/**
* 解析数据并发送 snapshotEventDeserializationSchema.deserialize(pair,
* outputCollector);
*/
this.snapshotRecordDeserializer.deserialize(pair, output);
}
}
start =
Expand All @@ -162,7 +168,8 @@ protected void snapshotEvents(TiDBSourceSplit split) throws Exception {
}
}

protected void captureStreamingEvents(TiDBSourceSplit split) throws Exception {
protected void captureStreamingEvents(TiDBSourceSplit split, Collector<SeaTunnelRow> output)
throws Exception {
long resolvedTs = resolvedTsStates.get(split);
if (resolvedTs >= STREAMING_VERSION_START_EPOCH) {
log.info("Capture streaming event from resolvedTs:{}", resolvedTs);
Expand All @@ -175,25 +182,20 @@ protected void captureStreamingEvents(TiDBSourceSplit split) throws Exception {
client.start(finalResolvedTs);
return client;
});
for (int i = 0; i < 1000; i++) {
final Cdcpb.Event.Row row = cdcClient.get();
if (row == null) {
break;
}
handleRow(row);
final Cdcpb.Event.Row row = cdcClient.get();
if (row == null) {
return;
}
handleRow(row);
resolvedTs = cdcClient.getMaxResolvedTs();
if (commits.size() > 0) {
flushRows(resolvedTs);
}
}

// ouput data
while (!committedEvents.isEmpty()) {
Cdcpb.Event.Row committedRow = committedEvents.take();
/**
* 解析数据发送 changeEventDeserializationSchema.deserialize(committedRow, outputCollector);
*/
Cdcpb.Event.Row row = committedEvents.take();
this.streamingRecordDeserializer.deserialize(row, output);
}
}

Expand Down

0 comments on commit 2edba9e

Please sign in to comment.