From 9b5f1ce54fdba255aab17bf2dcd958382d5c3ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vladim=C3=ADr=20Dudr?= Date: Wed, 25 Sep 2024 10:56:39 +0200 Subject: [PATCH] support specifying fe protocol --- .../apache/doris/flink/rest/RestService.java | 17 ++++++++++++----- .../doris/flink/rest/models/BackendV2.java | 11 +++++++++++ .../apache/doris/flink/sink/BackendUtil.java | 11 ++++------- .../doris/flink/sink/DorisCommittable.java | 2 +- .../flink/sink/batch/DorisBatchStreamLoad.java | 2 +- .../flink/sink/committer/DorisCommitter.java | 2 +- .../doris/flink/sink/copy/BatchStageLoad.java | 6 ++++-- .../flink/sink/copy/DorisCopyCommittable.java | 2 +- .../flink/sink/copy/DorisCopyCommitter.java | 2 +- .../flink/sink/schema/SchemaChangeManager.java | 2 +- .../flink/sink/writer/DorisStreamLoad.java | 6 +++--- .../doris/flink/rest/TestRestService.java | 10 +++++----- .../flink/sink/copy/TestDorisCopyWriter.java | 2 +- .../flink/sink/writer/TestDorisWriter.java | 2 +- 14 files changed, 47 insertions(+), 30 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 1663d4b39..1e1e9848e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -91,8 +91,8 @@ public class RestService implements Serializable { private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; private static final String FE_LOGIN = "/rest/v1/login"; private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema"; - private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan"; + private static final String TABLE_SCHEMA_API = "%s/api/%s/%s/_schema"; + private static final String QUERY_PLAN_API = "%s/api/%s/%s/_query_plan"; /** * send request to Doris FE and get response json string. @@ -131,6 +131,7 @@ private static String send( RequestConfig.custom() .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) + .setRedirectsEnabled(true) .build(); request.setConfig(requestConfig); @@ -310,6 +311,9 @@ public static String randomEndpoint(String feNodes, Logger logger) Collections.shuffle(nodes); for (String feNode : nodes) { String host = feNode.trim(); + if (!host.startsWith("http://") && !host.startsWith("https://")) { + host = "http://" + host; + } if (BackendUtil.tryHttpConnection(host)) { return host; } @@ -359,7 +363,11 @@ public static List getBackendsV2( for (String feNode : feNodeList) { try { - String beUrl = "http://" + feNode + BACKENDS_V2; + if (!feNode.startsWith("http://") && !feNode.startsWith("https://")) { + feNode = "http://" + feNode; + } + String beUrl = feNode + BACKENDS_V2; + logger.warn("---------------- URL set: {}", beUrl); HttpGet httpGet = new HttpGet(beUrl); String response = send(options, readOptions, httpGet, logger); logger.info("Backend Info:{}", response); @@ -387,8 +395,7 @@ public static List getBackendsV2( private static List convert(List feNodeList) { List nodeList = new ArrayList<>(); for (String node : feNodeList) { - String[] split = node.split(":"); - nodeList.add(BackendRowV2.of(split[0], Integer.valueOf(split[1]), true)); + nodeList.add(BackendRowV2.ofUrl(node, true)); } return nodeList; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java index 0a160e7c5..35ff4cc31 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java @@ -52,6 +52,9 @@ public String getIp() { } public void setIp(String ip) { + if (!ip.startsWith("http://") && !ip.startsWith("https://")) { + ip = "http://" + ip; + } this.ip = ip; } @@ -82,5 +85,13 @@ public static BackendRowV2 of(String ip, int httpPort, boolean alive) { rowV2.setAlive(alive); return rowV2; } + + public static BackendRowV2 ofUrl(String url, boolean alive) { + int lastColon = url.lastIndexOf(":"); + return BackendRowV2.of( + url.substring(0, lastColon), + Integer.valueOf(url.substring(lastColon + 1)), + alive); + } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 26771c9d9..2bfb1e434 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -58,12 +58,7 @@ private List initBackends(String beNodes) { if (tryHttpConnection(node)) { LOG.info("{} backend http connection success.", node); node = node.trim(); - String[] ipAndPort = node.split(":"); - BackendRowV2 backendRowV2 = new BackendRowV2(); - backendRowV2.setIp(ipAndPort[0]); - backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1])); - backendRowV2.setAlive(true); - backends.add(backendRowV2); + backends.add(BackendRowV2.ofUrl(node, true)); } }); return backends; @@ -98,8 +93,10 @@ public String getAvailableBackend(int subtaskId) { public static boolean tryHttpConnection(String host) { try { + if (!host.startsWith("http://") && !host.startsWith("https://")) { + host = "http://" + host; + } LOG.debug("try to connect host {}", host); - host = "http://" + host; URL url = new URL(host); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("GET"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java index 78560fda5..ed3a51f28 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java @@ -26,7 +26,7 @@ public class DorisCommittable implements DorisAbstractCommittable { private final long txnID; public DorisCommittable(String hostPort, String db, long txnID) { - this.hostPort = hostPort; + this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort; this.db = db; this.txnID = txnID; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 42b832076..585fbd2a3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -89,7 +89,7 @@ public class DorisBatchStreamLoad implements Serializable { private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE; private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; - private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; + private static final String LOAD_URL_PATTERN = "%s/api/%s/%s/_stream_load"; private String loadUrl; private String hostPort; private final String username; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index eafffd53a..dd79e2967 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -52,7 +52,7 @@ /** The committer to commit transaction. */ public class DorisCommitter implements Committer, Closeable { private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class); - private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc"; + private static final String commitPattern = "%s/api/%s/_stream_load_2pc"; private final CloseableHttpClient httpClient; private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java index 2c5ed5c27..137dcb3df 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java @@ -67,7 +67,7 @@ public class BatchStageLoad implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(BatchStageLoad.class); private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; - private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload"; + private static final String UPLOAD_URL_PATTERN = "%s/copy/upload"; private static final String LINE_DELIMITER_KEY_WITH_PRETIX = "file.line_delimiter"; private String uploadUrl; private String hostPort; @@ -96,7 +96,9 @@ public BatchStageLoad( this.password = dorisOptions.getPassword(); this.loadProps = executionOptions.getStreamLoadProp(); this.labelGenerator = labelGenerator; - this.hostPort = dorisOptions.getFenodes(); + this.hostPort = + (dorisOptions.getFenodes().startsWith("http") ? "" : "http://") + + dorisOptions.getFenodes(); this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort); this.fileNum = new AtomicInteger(); this.lineDelimiter = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java index 93869ce13..c22d6d353 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java @@ -26,7 +26,7 @@ public class DorisCopyCommittable implements DorisAbstractCommittable { private final String copySQL; public DorisCopyCommittable(String hostPort, String copySQL) { - this.hostPort = hostPort; + this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort; this.copySQL = copySQL; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java index 095c680a7..b55599e08 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java @@ -44,7 +44,7 @@ public class DorisCopyCommitter implements Committer, Closeable { private static final Logger LOG = LoggerFactory.getLogger(DorisCopyCommitter.class); - private static final String commitPattern = "http://%s/copy/query"; + private static final String commitPattern = "%s/copy/query"; private static final int SUCCESS = 0; private static final String FAIL = "1"; private ObjectMapper objectMapper = new ObjectMapper(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 50ec1d34a..aac56977b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -58,7 +58,7 @@ public class SchemaChangeManager implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; - private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private static final String SCHEMA_CHANGE_API = "%s/api/query/default_cluster/%s"; private ObjectMapper objectMapper = new ObjectMapper(); private DorisOptions dorisOptions; private String charsetEncoding = "UTF-8"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 44ff573e7..dda653c93 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -73,8 +73,8 @@ public class DorisStreamLoad implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; - private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; - private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"; + private static final String LOAD_URL_PATTERN = "%s/api/%s/%s/_stream_load"; + private static final String ABORT_URL_PATTERN = "%s/api/%s/_stream_load_2pc"; public static final String JOB_EXIST_FINISHED = "FINISHED"; private String loadUrlStr; @@ -102,7 +102,7 @@ public DorisStreamLoad( DorisExecutionOptions executionOptions, LabelGenerator labelGenerator, CloseableHttpClient httpClient) { - this.hostPort = hostPort; + this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort; String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); this.db = tableInfo[0]; this.table = tableInfo[1]; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java index ef59ef418..b3fb69a35 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java @@ -122,12 +122,12 @@ public void testParseIdentifierIllegal() throws Exception { @Test public void testChoiceFe() throws Exception { - String validFes = "1,2,3"; + String validFes = "1,http://2,https://3"; String fe = RestService.randomEndpoint(validFes, logger); List feNodes = new ArrayList<>(3); - feNodes.add("1"); - feNodes.add("2"); - feNodes.add("3"); + feNodes.add("http://1"); + feNodes.add("http://2"); + feNodes.add("https://3"); Assert.assertTrue(feNodes.contains(fe)); String emptyFes = ""; @@ -416,7 +416,7 @@ public void testParseBackendV2Error() throws Exception { public void testGetBackendsV2() { DorisOptions options = DorisOptions.builder() - .setFenodes("127.0.0.1:1,127.0.0.1:2") + .setFenodes("https://127.0.0.1:1,http://127.0.0.1:2,127.0.0.1:3") .setAutoRedirect(false) .build(); DorisReadOptions readOptions = DorisReadOptions.defaults(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java index 9e1327be0..dfe513980 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java @@ -97,7 +97,7 @@ public void testPrepareCommit() throws Exception { Assert.assertEquals(1, committableList.size()); DorisCopyCommittable committable = committableList.toArray(new DorisCopyCommittable[0])[0]; - Assert.assertEquals("127.0.0.1:8030", committable.getHostPort()); + Assert.assertEquals("http://127.0.0.1:8030", committable.getHostPort()); Pattern copySql = Pattern.compile( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index 3e1ab2c2c..47750e18c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -85,7 +85,7 @@ public void testPrepareCommit() throws Exception { Collection committableList = dorisWriter.prepareCommit(); Assert.assertEquals(1, committableList.size()); DorisCommittable dorisCommittable = committableList.stream().findFirst().get(); - Assert.assertEquals("local:8040", dorisCommittable.getHostPort()); + Assert.assertEquals("http://local:8040", dorisCommittable.getHostPort()); Assert.assertEquals("db", dorisCommittable.getDb()); Assert.assertEquals(2, dorisCommittable.getTxnID()); Assert.assertFalse(dorisWriter.isLoading());