Skip to content

Commit

Permalink
and timeout and retry straegy
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Dec 23, 2024
1 parent 7326a64 commit 48b4133
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private static String send(DorisOptions options, DorisReadOptions readOptions, H
try {
String response;
if (request instanceof HttpGet) {
response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger);
response = getConnectionGet(request, options.getUsername(), options.getPassword(), logger);
} else {
response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger);
}
Expand Down Expand Up @@ -162,6 +162,8 @@ private static String getConnectionPost(HttpRequestBase request, String user, St
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
conn.setConnectTimeout(request.getConfig().getConnectTimeout());
conn.setReadTimeout(request.getConfig().getSocketTimeout());
PrintWriter out = new PrintWriter(conn.getOutputStream());
// send request params
out.print(res);
Expand All @@ -171,13 +173,15 @@ private static String getConnectionPost(HttpRequestBase request, String user, St
return parseResponse(conn, logger);
}

private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException {
URL realUrl = new URL(request);
private static String getConnectionGet(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
URL realUrl = new URL(request.getURI().toString());
// open connection
HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);

connection.setConnectTimeout(request.getConfig().getConnectTimeout());
connection.setReadTimeout(request.getConfig().getSocketTimeout());
connection.connect();
return parseResponse(connection, logger);
}
Expand Down Expand Up @@ -346,7 +350,7 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
public static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
String feNodes = options.getFenodes();
List<String> feNodeList = allEndpoints(feNodes, logger);
for (String feNode: feNodeList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,9 @@ public boolean isAlive() {
public void setAlive(boolean alive) {
isAlive = alive;
}

public String toBackendString() {
return ip + ":" + httpPort;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -87,7 +89,8 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
private DorisExecutionOptions executionOptions;
private DorisStreamLoad dorisStreamLoad;
private String keysType;

private List<BackendV2.BackendRowV2> backends;
private long pos = 0L;
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
Expand All @@ -105,11 +108,13 @@ public DorisDynamicOutputFormat(DorisOptions option,
this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
this.keysType = parseKeysType();


handleStreamloadProp();
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
}

}

/**
Expand Down Expand Up @@ -186,13 +191,15 @@ public void configure(Configuration configuration) {

@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.backends = settingBackends();
dorisStreamLoad = new DorisStreamLoad(
getBackend(),
backends.get(0).toBackendString(),
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp());
executionOptions.getStreamLoadProp(),
readOptions);
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());

if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
Expand Down Expand Up @@ -326,9 +333,9 @@ public synchronized void flush() throws IOException {
throw new IOException(e);
}
try {
dorisStreamLoad.setHostPort(getBackend());
dorisStreamLoad.setHostPort(getAvailableBackend());
LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e);
Thread.sleep(1000 * i);
Thread.sleep(1000L * ( i + 1 ));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("unable to flush; interrupted while doing another attempt", e);
Expand All @@ -342,11 +349,35 @@ private String getBackend() throws IOException {
//get be url from fe
return RestService.randomBackend(options, readOptions, LOG);
} catch (IOException | DorisException e) {
LOG.error("get backends info fail");
LOG.error("get backends info fail", e);
throw new IOException(e);
}
}

private List<BackendV2.BackendRowV2> settingBackends(){
try {
List<BackendV2.BackendRowV2> backendsV2 = RestService.getBackendsV2(options, readOptions, LOG);
if(CollectionUtil.isNullOrEmpty(backendsV2)){
throw new RuntimeException("get no available backend.");
}
return backendsV2;
} catch (Exception e) {
LOG.error("get backends lists fail", e);
throw new RuntimeException(e);
}
}

public String getAvailableBackend() {
long tmp = pos + backends.size();
while (pos < tmp) {
BackendV2.BackendRowV2 backend =
backends.get((int) (pos % backends.size()));
pos++;
return backend.toBackendString();
}
throw new RuntimeException("error cause no available backend.");
}

/**
* Builder for {@link DorisDynamicOutputFormat}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.ConfigurationOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
Expand Down Expand Up @@ -65,17 +68,9 @@ public class DorisStreamLoad implements Serializable {
private String tbl;
private String authEncoding;
private Properties streamLoadProp;
private final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
private CloseableHttpClient httpClient;
private final HttpClientBuilder httpClientBuilder;

public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) {
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
Expand All @@ -84,7 +79,22 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = basicAuthHeader(user, passwd);
this.streamLoadProp = streamLoadProp;
this.httpClient = httpClientBuilder.build();
int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
this.httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
})
.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectTimeout)
.setSocketTimeout(socketTimeout)
.build());
}

public String getLoadUrlStr() {
Expand Down Expand Up @@ -134,18 +144,20 @@ private LoadResponse loadBatch(String value) {
StringEntity entity = new StringEntity(value, "UTF-8");
put.setEntity(entity);

try (CloseableHttpResponse response = httpClient.execute(put)) {
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
try (CloseableHttpClient httpClient = httpClientBuilder.build()){
try (CloseableHttpResponse response = httpClient.execute(put)) {
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
return new LoadResponse(statusCode, reasonPhrase, loadResult);
}
return new LoadResponse(statusCode, reasonPhrase, loadResult);
}
} catch (Exception e) {
String err = "failed to stream load data with label: " + label;
LOG.warn(err, e);
LOG.error(err, e);
return new LoadResponse(-1, e.getMessage(), err);
}
}
Expand All @@ -157,14 +169,6 @@ private String basicAuthHeader(String username, String password) {
}

public void close() throws IOException {
if (null != httpClient) {
try {
httpClient.close();
} catch (IOException e) {
LOG.error("Closing httpClient failed.", e);
throw new RuntimeException("Closing httpClient failed.", e);
}
}
}

public static class LoadResponse {
Expand Down

0 comments on commit 48b4133

Please sign in to comment.