From 9519b0260d6cf08188462e07b6e54ff41a1f8246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vladim=C3=ADr=20Dudr?= Date: Wed, 25 Sep 2024 10:56:39 +0200 Subject: [PATCH] support specifying fe protocol --- .../org/apache/doris/flink/rest/RestService.java | 14 +++++++++++--- .../org/apache/doris/flink/sink/BackendUtil.java | 4 +++- .../flink/sink/schema/SchemaChangeManager.java | 2 +- .../apache/doris/flink/rest/TestRestService.java | 10 +++++----- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 1663d4b39..304bfe2d7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -91,8 +91,8 @@ public class RestService implements Serializable { private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; private static final String FE_LOGIN = "/rest/v1/login"; private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema"; - private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan"; + private static final String TABLE_SCHEMA_API = "%s/api/%s/%s/_schema"; + private static final String QUERY_PLAN_API = "%s/api/%s/%s/_query_plan"; /** * send request to Doris FE and get response json string. @@ -131,6 +131,7 @@ private static String send( RequestConfig.custom() .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) + .setRedirectsEnabled(true) .build(); request.setConfig(requestConfig); @@ -310,6 +311,9 @@ public static String randomEndpoint(String feNodes, Logger logger) Collections.shuffle(nodes); for (String feNode : nodes) { String host = feNode.trim(); + if (!host.startsWith("http://") && !host.startsWith("https://")) { + host = "http://" + host; + } if (BackendUtil.tryHttpConnection(host)) { return host; } @@ -359,7 +363,11 @@ public static List getBackendsV2( for (String feNode : feNodeList) { try { - String beUrl = "http://" + feNode + BACKENDS_V2; + if (!feNode.startsWith("http://") && !feNode.startsWith("https://")) { + feNode = "http://" + feNode; + } + String beUrl = feNode + BACKENDS_V2; + logger.warn("---------------- URL set: {}", beUrl); HttpGet httpGet = new HttpGet(beUrl); String response = send(options, readOptions, httpGet, logger); logger.info("Backend Info:{}", response); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 26771c9d9..28233f65f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -98,8 +98,10 @@ public String getAvailableBackend(int subtaskId) { public static boolean tryHttpConnection(String host) { try { + if (!host.startsWith("http://") && !host.startsWith("https://")) { + host = "http://" + host; + } LOG.debug("try to connect host {}", host); - host = "http://" + host; URL url = new URL(host); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("GET"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 50ec1d34a..aac56977b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -58,7 +58,7 @@ public class SchemaChangeManager implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; - private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private static final String SCHEMA_CHANGE_API = "%s/api/query/default_cluster/%s"; private ObjectMapper objectMapper = new ObjectMapper(); private DorisOptions dorisOptions; private String charsetEncoding = "UTF-8"; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java index ef59ef418..b3fb69a35 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java @@ -122,12 +122,12 @@ public void testParseIdentifierIllegal() throws Exception { @Test public void testChoiceFe() throws Exception { - String validFes = "1,2,3"; + String validFes = "1,http://2,https://3"; String fe = RestService.randomEndpoint(validFes, logger); List feNodes = new ArrayList<>(3); - feNodes.add("1"); - feNodes.add("2"); - feNodes.add("3"); + feNodes.add("http://1"); + feNodes.add("http://2"); + feNodes.add("https://3"); Assert.assertTrue(feNodes.contains(fe)); String emptyFes = ""; @@ -416,7 +416,7 @@ public void testParseBackendV2Error() throws Exception { public void testGetBackendsV2() { DorisOptions options = DorisOptions.builder() - .setFenodes("127.0.0.1:1,127.0.0.1:2") + .setFenodes("https://127.0.0.1:1,http://127.0.0.1:2,127.0.0.1:3") .setAutoRedirect(false) .build(); DorisReadOptions readOptions = DorisReadOptions.defaults();