Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP #8941

Merged
merged 59 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8d3d6ea
[Dependabot](deps): Bump actions/checkout from 2 to 3
dependabot[bot] May 1, 2022
c94f632
[INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to…
haibo-duan Sep 20, 2023
51eeeac
Merge branch 'master' into INLONG-8674
haibo-duan Sep 20, 2023
2f421e0
[INLONG-8674][Manager] add license
haibo-duan Sep 20, 2023
145cff1
[INLONG-8674][Manager] fix format violations
haibo-duan Sep 20, 2023
b83d7d9
[INLONG-8674][Manager] fix format violations PulsarUtilsTest
haibo-duan Sep 20, 2023
3eb4d6d
[INLONG-8674][Manager] fix format violations PulsarUtils PulsarResour…
haibo-duan Sep 20, 2023
297d53e
[INLONG-8674][Manager] fix format violations PulsarUtils PulsarResour…
haibo-duan Sep 20, 2023
aafa196
[INLONG-8674][Manager] fix format violations PulsarUtils PulsarResour…
haibo-duan Sep 20, 2023
89c1df0
[INLONG-8674][Manager] fix format violations PulsarUtils PulsarResour…
haibo-duan Sep 20, 2023
59bb210
[INLONG-8674][Manager] remove the pulsar dependency in the pom
haibo-duan Sep 21, 2023
810367e
[INLONG-8674][Manager] remove the pulsar dependency in the pom
haibo-duan Sep 21, 2023
b82b5c8
[INLONG-8674][Manager] remove the pulsar dependency in the pom
haibo-duan Sep 21, 2023
dfa8ca6
[INLONG-8674][Manager] remove the pulsar dependency in the pom
haibo-duan Sep 21, 2023
d332832
[INLONG-8674][Manager] remove the pulsar dependency in the pom
haibo-duan Sep 21, 2023
e7ecc3c
[INLONG-8674][Manager] resolve conflicts
haibo-duan Sep 28, 2023
bbd3a0f
[INLONG-8674][Manager] resolve conflicts
haibo-duan Sep 28, 2023
6176fe4
[INLONG-8674][Manager] resolve conflicts
haibo-duan Sep 28, 2023
2849bdb
Merge branch 'INLONG-8674' of https://github.com/haibo-duan/inlong in…
haibo-duan Sep 28, 2023
046ec5d
chmod 644 to 755
haibo-duan Oct 8, 2023
0b7d211
fix comment
haibo-duan Oct 17, 2023
3f8a743
fix comment
haibo-duan Oct 17, 2023
747345a
[INLONG-8676][Manager] convert hump to underline
haibo-duan Oct 17, 2023
e426d6c
Merge pull request #1 from haibo-duan/dependabot/github_actions/actio…
haibo-duan Oct 17, 2023
fbccbaf
[INLONG-8676][Manager] add specific description
haibo-duan Oct 23, 2023
3653b38
[INLONG-8676][Manager] fix bug :404 Not Found: "HTTP ERROR 404 Not Fo…
haibo-duan Oct 25, 2023
c139b9d
[INLONG-8676][Manager] fix bug :404 Not Found: "HTTP ERROR 404 Not Fo…
haibo-duan Oct 25, 2023
5b0aeb0
[INLONG-8676][Manager] fix bug :404 Not Found: "HTTP ERROR 404 Not Fo…
haibo-duan Oct 25, 2023
337863d
Merge branch 'master' of https://github.com/haibo-duan/inlong into IN…
haibo-duan Oct 25, 2023
fba29e4
Merge branch 'master' of https://github.com/haibo-duan/inlong
haibo-duan Oct 25, 2023
63d0c01
Merge branch 'apache:master' into INLONG-8674
haibo-duan Oct 25, 2023
bd152c7
Merge branch 'master' into INLONG-8674
haibo-duan Oct 25, 2023
e567b60
[INLONG-8676][Manager] merge master
haibo-duan Oct 25, 2023
9521260
Merge branch 'INLONG-8674' of https://github.com/haibo-duan/inlong in…
haibo-duan Oct 25, 2023
ad804bf
[INLONG-8676][Manager] spotless check
haibo-duan Oct 25, 2023
f649ee1
[INLONG-8676][Manager] add test case
haibo-duan Oct 25, 2023
7d90feb
[INLONG-8676][Manager] check style
haibo-duan Oct 25, 2023
2c67d83
[INLONG-8676][Manager] modify pulsar DEFAULT_SERVICE_URL
haibo-duan Oct 25, 2023
3ed556b
[INLONG-8676][Manager] modify pulsar DEFAULT_SERVICE_URL
haibo-duan Oct 25, 2023
c99ed53
[INLONG-8676][Manager] fix bug and add test case
haibo-duan Oct 29, 2023
2e9a13a
[INLONG-8676][Manager] fix bug and add test case
haibo-duan Oct 29, 2023
c5ec819
[INLONG-8676][Manager] mvn spotless:apply to fix these violations
haibo-duan Oct 29, 2023
6a7dbc2
[INLONG-8676][Manager] mvn spotless:apply to fix these violations
haibo-duan Oct 29, 2023
a331d22
[INLONG-8676][Manager] Override the examineMessage method
haibo-duan Oct 31, 2023
c6f3a5b
[INLONG-8676][Manager] mvn spotless:apply
haibo-duan Oct 31, 2023
7507db9
[INLONG-8676][Manager] mvn spotless:apply
haibo-duan Oct 31, 2023
ad5345f
[INLONG-8676][Manager] Add netty configuration in pom file
haibo-duan Nov 1, 2023
375d0c7
[INLONG-8676][Manager] remove netty
haibo-duan Nov 1, 2023
3bba643
[INLONG-8676][Manager] remove netty
haibo-duan Nov 1, 2023
158f152
[INLONG-8676][Manager] fix comments
haibo-duan Nov 2, 2023
f84c5ce
[INLONG-8676][Manager] fix comments
haibo-duan Nov 2, 2023
f6bd178
[INLONG-8676][Manager] fix comments
haibo-duan Nov 2, 2023
360682b
[INLONG-8676][Manager] fix comments
haibo-duan Nov 2, 2023
02bea80
[INLONG-8676][Manager] add sleep time
haibo-duan Nov 2, 2023
a4cf30d
[INLONG-8676][Manager] add sleep time
haibo-duan Nov 5, 2023
f8db827
[INLONG-8676][Manager] Modify constant name
haibo-duan Nov 5, 2023
1310ac0
[INLONG-8676][Manager] Modify variable name.
haibo-duan Nov 13, 2023
c0234b5
Merge branch 'master' of https://github.com/apache/inlong
haibo-duan Nov 17, 2023
c44ede9
Merge branch 'master' into INLONG-8674
haibo-duan Nov 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) {

try {
// test connect for pulsar adminUrl
PulsarUtils.getPulsarTenants(restTemplate, pulsarInfo);
PulsarUtils.getTenants(restTemplate, pulsarInfo);
return true;
} catch (Exception e) {
String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarInfo.getAdminUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private boolean getPulsarConnection(String adminUrl, String token) {
.token(token).build();
try {
// test connect for pulsar adminUrl
PulsarUtils.getPulsarTenants(restTemplate, pulsarClusterInfo);
PulsarUtils.getTenants(restTemplate, pulsarClusterInfo);
return true;
} catch (Exception e) {
String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarClusterInfo.getAdminUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void createTenant(PulsarClusterInfo pulsarClusterInfo, String tenant) thr
Preconditions.expectNotBlank(tenant, ErrorCodeEnum.INVALID_PARAMETER, "Tenant cannot be empty");

try {
List<String> clusters = PulsarUtils.getPulsarClusters(restTemplate, pulsarClusterInfo);
List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo);
boolean exists = this.tenantIsExists(pulsarClusterInfo, tenant);
if (exists) {
LOGGER.warn("pulsar tenant={} already exists, skip to create", tenant);
Expand Down Expand Up @@ -179,10 +179,10 @@ public void createTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo top
LOGGER.info("success to create topic={}, lookup result is {}", fullTopicName, res);
} else {
// The number of brokers as the default value of topic partition
List<String> clusters = PulsarUtils.getPulsarClusters(restTemplate, pulsarClusterInfo);
List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo);
Integer numPartitions = topicInfo.getNumPartitions();
if (numPartitions < 0 || numPartitions >= MAX_PARTITION) {
List<String> brokers = PulsarUtils.getPulsarBrokers(restTemplate, pulsarClusterInfo);
List<String> brokers = PulsarUtils.getBrokers(restTemplate, pulsarClusterInfo);
numPartitions = brokers.size();
}
PulsarUtils.createPartitionedTopic(restTemplate, pulsarClusterInfo, fullTopicName,
Expand Down Expand Up @@ -284,7 +284,7 @@ public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, String subs
* @throws Exception any exception if occurred
*/
private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception {
List<String> tenants = PulsarUtils.getPulsarTenants(restTemplate, pulsarClusterInfo);
List<String> tenants = PulsarUtils.getTenants(restTemplate, pulsarClusterInfo);
return tenants.contains(tenant);
}

Expand All @@ -299,7 +299,7 @@ private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String tenan
*/
private boolean namespaceExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace)
throws Exception {
List<String> namespaces = PulsarUtils.getPulsarNamespaces(restTemplate, pulsarClusterInfo, tenant);
List<String> namespaces = PulsarUtils.getNamespaces(restTemplate, pulsarClusterInfo, tenant);
return namespaces.contains(namespace);
}

Expand All @@ -320,10 +320,10 @@ public boolean topicExists(PulsarClusterInfo pulsarClusterInfo, String tenant, S
boolean topicExists = false;
try {
if (isPartitioned) {
topics = PulsarUtils.getPulsarPartitionedTopics(restTemplate, pulsarClusterInfo, tenant,
topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo, tenant,
namespace);
} else {
topics = PulsarUtils.getPulsarTopics(restTemplate, pulsarClusterInfo, tenant, namespace);
topics = PulsarUtils.getTopics(restTemplate, pulsarClusterInfo, tenant, namespace);
}
for (String t : topics) {
t = t.substring(t.lastIndexOf("/") + 1); // not contains /
Expand All @@ -346,7 +346,7 @@ public boolean topicExists(PulsarClusterInfo pulsarClusterInfo, String tenant, S
LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topicName, count);
Thread.sleep(DELAY_SECONDS);

topics = PulsarUtils.getPulsarPartitionedTopics(restTemplate, pulsarClusterInfo,
topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo,
tenant, namespace);
for (String t : topics) {
t = t.substring(t.lastIndexOf("/") + 1);
Expand Down Expand Up @@ -430,7 +430,7 @@ public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterIn
private int getPartitionCount(PulsarClusterInfo pulsarClusterInfo, String topicFullName) {
PulsarTopicMetadata pulsarTopicMetadata;
try {
pulsarTopicMetadata = PulsarUtils.getPulsarPartitionedTopicMetadata(restTemplate,
pulsarTopicMetadata = PulsarUtils.getPartitionedTopicMetadata(restTemplate,
pulsarClusterInfo, topicFullName);
} catch (Exception e) {
String errMsg = "get pulsar partition error ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static HttpHeaders getHttpHeaders(String token) {
* @return list of pulsar cluster infos
* @throws Exception any exception if occurred
*/
public static List<String> getPulsarClusters(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
public static List<String> getClusters(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
throws Exception {
final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
Expand All @@ -114,17 +114,17 @@ public static List<String> getPulsarClusters(RestTemplate restTemplate, PulsarCl
* @return list of pulsar broker infos
* @throws Exception any exception if occurred
*/
public static List<String> getPulsarBrokers(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
public static List<String> getBrokers(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
throws Exception {
List<String> clusters = getPulsarClusters(restTemplate, clusterInfo);
List<String> clusters = getClusters(restTemplate, clusterInfo);
List<String> brokers = new ArrayList<>();
for (String brokerName : brokers) {
for (String brokerName : clusters) {
String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/" + brokerName;
clusters.addAll(
brokers.addAll(
HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
ArrayList.class));
}
return clusters;
return brokers;
}

/**
Expand All @@ -135,7 +135,7 @@ public static List<String> getPulsarBrokers(RestTemplate restTemplate, PulsarClu
* @return list of pulsar tenant infos
* @throws Exception any exception if occurred
*/
public static List<String> getPulsarTenants(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
public static List<String> getTenants(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
throws Exception {
final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
Expand All @@ -151,7 +151,7 @@ public static List<String> getPulsarTenants(RestTemplate restTemplate, PulsarClu
* @return list of pulsar namespace infos
* @throws Exception any exception if occurred
*/
public static List<String> getPulsarNamespaces(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
public static List<String> getNamespaces(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
String tenant) throws Exception {
String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" + tenant;
return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
Expand Down Expand Up @@ -210,7 +210,7 @@ public static void createNamespace(RestTemplate restTemplate, PulsarClusterInfo
* @return list of pulsar topic infos
* @throws Exception any exception if occurred
*/
public static List<String> getPulsarTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
public static List<String> getTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
String namespace) throws Exception {
String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace;
return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
Expand All @@ -227,7 +227,7 @@ public static List<String> getPulsarTopics(RestTemplate restTemplate, PulsarClus
* @return list of pulsar partitioned topic infos
* @throws Exception any exception if occurred
*/
public static List<String> getPulsarPartitionedTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
public static List<String> getPartitionedTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
String tenant, String namespace) throws Exception {
String url =
clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace + "/partitioned";
Expand Down Expand Up @@ -273,7 +273,7 @@ public static void createPartitionedTopic(RestTemplate restTemplate, PulsarClust
* @return pulsar internal stat info of partitioned topic
* @throws Exception any exception if occurred
*/
public static JsonObject getPulsarStatsPartitionedTopics(RestTemplate restTemplate,
public static JsonObject getStatsPartitionedTopics(RestTemplate restTemplate,
haibo-duan marked this conversation as resolved.
Show resolved Hide resolved
PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitioned-internalStats";
return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
Expand All @@ -289,7 +289,7 @@ public static JsonObject getPulsarStatsPartitionedTopics(RestTemplate restTempla
* @return pulsar topic metadata info
* @throws Exception any exception if occurred
*/
public static PulsarTopicMetadata getPulsarPartitionedTopicMetadata(RestTemplate restTemplate,
public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate restTemplate,
PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
Expand Down Expand Up @@ -421,7 +421,7 @@ public static String lookupTopic(RestTemplate restTemplate, PulsarClusterInfo cl
*/
public static Map<String, String> lookupPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
String topicPath) throws Exception {
PulsarTopicMetadata metadata = getPulsarPartitionedTopicMetadata(restTemplate, clusterInfo, topicPath);
PulsarTopicMetadata metadata = getPartitionedTopicMetadata(restTemplate, clusterInfo, topicPath);
Map<String, String> map = new LinkedHashMap<>();
for (int i = 0; i < metadata.getPartitions(); i++) {
String partitionTopicName = topicPath + "-partition-" + i;
Expand Down
Loading