Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11695][SDK] MessageSender related interfaces abstraction #11696

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.network;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;

import java.util.concurrent.ConcurrentHashMap;

/**
* Client Manager interface
*
* Used to Manager network client
*/
public interface ClientMgr {

/**
* Start network client manager
*
* @param procResult the start result, return detail error infos if sending fails
* @return true if successful, false return indicates failure.
*/
boolean start(ProcessResult procResult);

/**
* Stop network client manager
*
*/
void stop();

/**
* Get the number of proxy nodes currently in use
*
* @return Number of nodes in use
*/
int getActiveNodeCnt();

/**
* Get the number of in-flight messages
*
* @return Number of in-flight messages
*/
int getInflightMsgCnt();

/**
* Update cached proxy nodes
*
* @param nodeChanged whether the updated node has changed
* @param hostInfoMap the new proxy nodes
*/
void updateProxyInfoList(boolean nodeChanged, ConcurrentHashMap<String, HostInfo> hostInfoMap);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;

/**
* Message Sender interface
*
* Used to define the sender common methods
*/
public interface MessageSender {

/**
* Start sender when the sender is built
*
* <p>Attention:
* if return false, the caller need to handle it based on the error code and
* error information returned by procResult, such as:
* prompting the user, retrying after some time, etc.
* </p>
*
* @param procResult the start result, return detail error infos if sending fails
* @return true if successful, false return indicates that the sender fails to start.
*/
boolean start(ProcessResult procResult);

/**
* Close the sender when need to stop the sender's sending service.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;

/**
* Message Send Callback interface
*
* Used to define the send callback methods
*/
public interface MsgSendCallback {

/**
* Invoked when a message is confirmed by DataProxy
*
* @param result The event process result, include detail error infos if sending fails
*/
void onMessageAck(ProcessResult result);

/**
* Invoked when a message transportation interrupted by an exception
*
* @param ex The exception info
*/
void onException(Throwable ex);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender.http;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;

/**
* HTTP Message Sender interface
*
* Used to define the HTTP message sender methods
*/
public interface HttpMsgSender extends MessageSender {

/**
* Synchronously send message and wait for the final sending result
*
* <p>Attention: if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.</p>
*
* @param eventInfo the event information need to send
* @param procResult The send result, including the detail error infos if failed
* @return true if successful, false if failed for some reason.
*/
boolean syncSendMessage(HttpEventInfo eventInfo, ProcessResult procResult);

/**
* Asynchronously send message
*
* <p>Attention: if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.</p>
*
* @param eventInfo the event information need to send
* @param callback the callback that returns the response from DataProxy or
* an exception that occurred while waiting for the response.
* @param procResult The send result, including the detail error infos if the event not accepted
* @return true if successful, false if the event not accepted for some reason.
*/
boolean asyncSendMessage(HttpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender.tcp;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;

/**
* TCP Message Sender interface
*
* Used to define the TCP message sender methods
*/
public interface TcpMsgSender extends MessageSender {

/**
* Send message without response
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.
* 2. this method may lose messages. It is suitable for situations where the reporting volume is very large,
* the business does not pay attention to the final reporting results, and
* the message loss is tolerated in the event of an exception.
* </p>
*
* @param eventInfo the event information need to send
* @param procResult The send result, include the detail error infos if the eventInfo is not accepted
* @return true if successful, false return indicates that the eventInfo was not accepted for some reason.
*/
boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult procResult);

/**
* Synchronously send message and wait for the final sending result
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.
* 2. this method, with sendInB2B = true, tries to ensure that messages are delivered, but there
* may be duplicate messages or message loss scenarios. It is suitable for scenarios with
* a very large number of reports, very low reporting time requirements, and
* the need to return the sending results.
* 3. this method, with sendInB2B = false, ensures that the message is delivered only once and
* will not be repeated. It is suitable for businesses with a small amount of reports and
* no requirements on the reporting time, but require DataProxy to forward messages with high reliability.
* </p>
*
* @param sendInB2B indicates the DataProxy message service mode, true indicates DataProxy returns
* as soon as it receives the request and forwards the message in B2B mode until it succeeds;
* false indicates DataProxy returns after receiving the request and forwarding it successfully,
* and DataProxy does not retry on failure
* @param eventInfo the event information need to send
* @param procResult The send result, including the detail error infos if failed
* @return true if successful, false if failed for some reason.
*/
boolean syncSendMessage(boolean sendInB2B,
TcpEventInfo eventInfo, ProcessResult procResult);

/**
* Asynchronously send message
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.
* 2. this method, with sendInB2B = true, tries to ensure that messages are delivered, but there
* may be duplicate messages or message loss scenarios. It is suitable for scenarios with
* a very large number of reports, very low reporting time requirements, and
* the need to return the sending results.
* 3. this method, with sendInB2B = false, ensures that the message is delivered only once and
* will not be repeated. It is suitable for businesses with a small amount of reports and
* no requirements on the reporting time, but require DataProxy to forward messages with high reliability.
* </p>
*
* @param sendInB2B indicates the DataProxy message service mode, true indicates DataProxy returns
* as soon as it receives the request and forwards the message in B2B mode until it succeeds;
* false indicates DataProxy returns after receiving the request and forwarding it successfully,
* and DataProxy does not retry on failure
* @param eventInfo the event information need to send
* @param callback the callback that returns the response from DataProxy or
* an exception that occurred while waiting for the response.
* @param procResult The send result, including the detail error infos if the event not accepted
* @return true if successful, false if the event not accepted for some reason.
*/
boolean asyncSendMessage(boolean sendInB2B,
TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult);
}
Loading