Skip to content

Commit

Permalink
[INLONG-11670][SDK] Rename the ProxysdkException class name to ProxyS…
Browse files Browse the repository at this point in the history
…dkException (#11671)

* [INLONG-11670][SDK] Rename the ProxysdkException class name to ProxySdkException

* [INLONG-11670][SDK] Rename the ProxysdkException class name to ProxySdkException

---------

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Jan 15, 2025
1 parent 9995c22 commit 004c2be
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -271,7 +271,7 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {

private void asyncSendByMessageSender(SendMessageCallback cb,
List<byte[]> bodyList, String groupId, String streamId, long dataTime, String msgUUID,
Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
Map<String, String> extraAttrMap, boolean isProxySend) throws ProxySdkException {
sender.asyncSendMessage(cb, bodyList, groupId,
streamId, dataTime, msgUUID, extraAttrMap, isProxySend);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public interface AttributeConstants {
/* from where */
String FROM = "f";

/* msg uuid */
String MSG_UUID = "msgUUID";

// whether to return a response, false: not need, true or not exist: need
String MESSAGE_IS_ACK = "isAck";

Expand Down Expand Up @@ -101,11 +104,14 @@ public interface AttributeConstants {

// Message reporting time, in milliseconds
// Provided by the initial sender of the data, and passed to
// the downstream by the Bus without modification for the downstream to
// the downstream by the DataProxy without modification for the downstream to
// calculate the end-to-end message delay; if this field does not exist in the request,
// it will be added by the Bus with the current time
// it will be added by the DataProxy with the current time
String MSG_RPT_TIME = "rtms";

// inlong sdk version
String PROXY_SDK_VERSION = "sdkVersion";

// Audit version is used for audit to reconciliation
String AUDIT_VERSION = "auditVersion";
}
22 changes: 22 additions & 0 deletions inlong-sdk/dataproxy-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,28 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.github.git-commit-id</groupId>
<artifactId>git-commit-id-maven-plugin</artifactId>
<version>4.9.9</version>
<configuration>
<generateGitPropertiesFile>true</generateGitPropertiesFile>
<generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename>
<includeOnlyProperties>
<includeOnlyProperty>^git.build.(version)$</includeOnlyProperty>
</includeOnlyProperties>
<commitIdGenerationMode>full</commitIdGenerationMode>
</configuration>
<executions>
<execution>
<id>get-the-git-infos</id>
<goals>
<goal>revision</goal>
</goals>
<phase>initialize</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
Expand Down Expand Up @@ -443,25 +443,25 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre

@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException {
String msgUUID, Map<String, String> extraAttrMap) throws ProxySdkException {
asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, extraAttrMap, false);
}

@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
String msgUUID) throws ProxysdkException {
String msgUUID) throws ProxySdkException {
asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, false);
}

@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, String msgUUID) throws ProxysdkException {
long dt, String msgUUID) throws ProxySdkException {
asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, false);
}

@Override
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
long dt, String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException {
long dt, String msgUUID, Map<String, String> extraAttrMap) throws ProxySdkException {
asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, extraAttrMap, false);
}

Expand Down Expand Up @@ -529,16 +529,16 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
* @param dt data report timestamp
* @param msgUUID msg uuid
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @throws ProxysdkException
* @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId,
String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxysdkException {
String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, 1);

Expand Down Expand Up @@ -584,16 +584,16 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g
* @param msgUUID msg uuid
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @throws ProxysdkException
* @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
String msgUUID, Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
String msgUUID, Map<String, String> extraAttrMap, boolean isProxySend) throws ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, 1);
if (isProxySend) {
Expand Down Expand Up @@ -635,16 +635,16 @@ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String g
* @param dt data report time
* @param msgUUID msg uuid
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @throws ProxysdkException
* @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxysdkException {
String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
String proxySend = "";
Expand Down Expand Up @@ -690,18 +690,18 @@ public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList
* @param msgUUID msg uuid
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @throws ProxysdkException
* @throws ProxySdkException
*/
public void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
Map<String, String> extraAttrMap, boolean isProxySend) throws ProxySdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
if (isProxySend) {
Expand Down Expand Up @@ -738,11 +738,11 @@ public void asyncSendMessage(SendMessageCallback callback,
* @param inlongStreamId
* @param body
* @param callback
* @throws ProxysdkException
* @throws ProxySdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback)
throws ProxysdkException {
throws ProxySdkException {
this.asyncSendMessage(callback, body, inlongGroupId,
inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId());
}
Expand All @@ -755,10 +755,10 @@ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[]
* @param body a single message
* @param callback callback can be null
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @throws ProxysdkException
* @throws ProxySdkException
*/
public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback,
boolean isProxySend) throws ProxysdkException {
boolean isProxySend) throws ProxySdkException {
this.asyncSendMessage(callback, body, inlongGroupId,
inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), isProxySend);
}
Expand All @@ -770,11 +770,11 @@ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[]
* @param inlongStreamId streamId
* @param bodyList list of messages
* @param callback callback can be null
* @throws ProxysdkException
* @throws ProxySdkException
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
SendMessageCallback callback) throws ProxysdkException {
SendMessageCallback callback) throws ProxySdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId());
}
Expand All @@ -787,10 +787,10 @@ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<b
* @param bodyList list of messages
* @param callback callback can be null
* @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
* @throws ProxysdkException
* @throws ProxySdkException
*/
public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
SendMessageCallback callback, boolean isProxySend) throws ProxysdkException {
SendMessageCallback callback, boolean isProxySend) throws ProxySdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), isProxySend);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,7 +79,7 @@ SendResult sendMessage(List<byte[]> bodyList, String groupId,
*/
void asyncSendMessage(SendMessageCallback callback,
byte[] body, String groupId, String streamId, long dt, String msgUUID,
Map<String, String> extraAttrMap) throws ProxysdkException;
Map<String, String> extraAttrMap) throws ProxySdkException;

/**
* This method provides an asynchronized function which you want to send data without packing
Expand All @@ -89,7 +89,7 @@ void asyncSendMessage(SendMessageCallback callback,
* @param body The data will be sent
*/
void asyncSendMessage(SendMessageCallback callback,
byte[] body, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException;
byte[] body, String groupId, String streamId, long dt, String msgUUID) throws ProxySdkException;

/**
* This method provides an asynchronized function which you want to send data with packing
Expand All @@ -98,7 +98,7 @@ void asyncSendMessage(SendMessageCallback callback,
* @param bodyList The data will be sent,which is a collection consisting of byte arrays
*/
void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException;
List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID) throws ProxySdkException;

/**
* This method provides an asynchronized function which you want to send data with packing
Expand All @@ -111,7 +111,7 @@ void asyncSendMessage(SendMessageCallback callback,
*/
void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
Map<String, String> extraAttrMap) throws ProxysdkException;
Map<String, String> extraAttrMap) throws ProxySdkException;

/**
* This method provides an asynchronized function which you want to send data.<br>
Expand All @@ -121,10 +121,10 @@ void asyncSendMessage(SendMessageCallback callback,
* @param inlongStreamId
* @param body
* @param callback callback can be null
* @throws ProxysdkException
* @throws ProxySdkException
*/
void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback)
throws ProxysdkException;
throws ProxySdkException;

/**
* This method provides an asynchronized function which you want to send datas.<br>
Expand All @@ -134,9 +134,9 @@ void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body,
* @param inlongStreamId
* @param bodyList
* @param callback callback can be null
* @throws ProxysdkException
* @throws ProxySdkException
*/
void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
SendMessageCallback callback) throws ProxysdkException;
SendMessageCallback callback) throws ProxySdkException;

}
Loading

0 comments on commit 004c2be

Please sign in to comment.