Skip to content

Commit

Permalink
[INLONG-11700][SDK] Optimize TCP message reporting Sender implementat…
Browse files Browse the repository at this point in the history
…ion (#11701)

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Jan 21, 2025
1 parent bfbeae2 commit 16084cc
Show file tree
Hide file tree
Showing 10 changed files with 1,462 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,20 @@ public enum ErrorCode {
PARSE_ENCRYPT_META_EXCEPTION(52, "Parse encrypt content failure"),
META_REQUIRED_FIELD_NOT_EXIST(53, "Required meta field not exist"),
META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"),

//
FETCH_PROXY_META_FAILURE(59, "Fetch dataproxy meta info failure"),
FETCH_ENCRYPT_META_FAILURE(60, "Fetch encrypt meta info failure"),
//
NO_NODE_META_INFOS(81, "No proxy node metadata info in local"),
EMPTY_ACTIVE_NODE_SET(82, "Empty active node set"),
EMPTY_WRITABLE_NODE_SET(83, "Empty writable node set"),
NO_VALID_REMOTE_NODE(84, "No valid remote node set"),
//
REPORT_INFO_EXCEED_MAX_LEN(91, "Report info exceed max allowed length"),
ENCODE_BODY_EXCEPTION(92, "Encode body exception"),
COMPRESS_BODY_EXCEPTION(93, "Compress body exception"),
ENCRYPT_BODY_EXCEPTION(94, "Encrypt body exception"),
GENERATE_SIGNATURE_EXCEPTION(95, "Generate signature exception"),
//
CONNECTION_UNAVAILABLE(111, "Connection unavailable"),
CONNECTION_BREAK(112, "Connection break"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ public class SdkConsts {
public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
public static final int FLAG_ALLOW_COMPRESS = 1 << 5;

public static final int EXT_FIELD_FLAG_DISABLE_ID2NUM = 1 << 2;
public static final int EXT_FIELD_FLAG_SEP_BY_LF = 1 << 5;

public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;

/* Reserved attribute data size(bytes). */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.network.tcp;

import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.sdk.dataproxy.network.tcp.codec.DecodeObject;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TCP client handler class
*
* Used to process TCP response message.
*/
public class ClientHandler extends SimpleChannelInboundHandler<DecodeObject> {

private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private static final LogCounter exceptCnt = new LogCounter(10, 100000, 60 * 1000L);
private static final LogCounter thrownCnt = new LogCounter(10, 100000, 60 * 1000L);

private final TcpClientMgr tcpClientMgr;

public ClientHandler(TcpClientMgr tcpClientMgr) {
this.tcpClientMgr = tcpClientMgr;
}

@Override
public void channelRead0(ChannelHandlerContext ctx, DecodeObject decObject) {
if (decObject.getMsgType() != MsgType.MSG_BIN_HEARTBEAT) {
tcpClientMgr.feedbackMsgResponse(ctx.channel().toString(), decObject);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
if (exceptCnt.shouldPrint()) {
logger.warn("ClientHandler({})'s channel {} has error!",
tcpClientMgr.getSenderId(), ctx.channel(), e);
}
try {
tcpClientMgr.setChannelFrozen(ctx.channel().toString());
} catch (Throwable ex) {
if (thrownCnt.shouldPrint()) {
logger.warn("ClientHandler({}) exceptionCaught throw exception",
tcpClientMgr.getSenderId(), ex);
}
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
if (logger.isDebugEnabled()) {
logger.debug("ClientHandler({}) channelDisconnected {}",
tcpClientMgr.getSenderId(), ctx.channel());
}
try {
tcpClientMgr.notifyChannelDisconnected(ctx.channel().toString());
} catch (Throwable ex) {
if (thrownCnt.shouldPrint()) {
logger.warn("ClientHandler({}) channelInactive throw exception",
tcpClientMgr.getSenderId(), ex);
}
}
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("ClientHandler({}) channelUnregistered {}",
tcpClientMgr.getSenderId(), ctx.channel());
}
try {
tcpClientMgr.notifyChannelDisconnected(ctx.channel().toString());
} catch (Throwable ex) {
if (thrownCnt.shouldPrint()) {
logger.warn("ClientHandler({}) channelUnregistered throw exception",
tcpClientMgr.getSenderId(), ex);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.network.tcp;

import org.apache.inlong.sdk.dataproxy.network.tcp.codec.ProtocolDecoder;
import org.apache.inlong.sdk.dataproxy.network.tcp.codec.ProtocolEncoder;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

/**
* TCP client Pipeline Factory class
*
* Used to build TCP pipeline
*/
public class ClientPipelineFactory extends ChannelInitializer<SocketChannel> {

private final TcpClientMgr tcpClientMgr;

public ClientPipelineFactory(TcpClientMgr tcpClientMgr) {
this.tcpClientMgr = tcpClientMgr;
}

@Override
public void initChannel(SocketChannel ch) throws Exception {

// Setup channel except for the SsHandler for TLS enabled connections

ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
65536, 0, 4, 0, 0));

ch.pipeline().addLast("contentDecoder", new ProtocolDecoder());
ch.pipeline().addLast("contentEncoder", new ProtocolEncoder());
ch.pipeline().addLast("handler", new ClientHandler(tcpClientMgr));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.network.tcp;

/**
* Send Qos class
*
* Used to identify different send type
*/
public enum SendQos {

NO_ACK, // only request, without response
SOURCE_ACK, // request and return a response upon receipt by DataProxy, default
SINK_ACK // request, and return a response after successful forwarding of the request by DataProxy
}
Loading

0 comments on commit 16084cc

Please sign in to comment.