Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17519: CloudSolrClient with HTTP ClusterState can forget live nodes and then fail #2935

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -51,6 +56,7 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private String urlScheme;
private Set<String> initialNodes;
volatile Set<String> liveNodes;
long liveNodesTimestamp = 0;
volatile Map<String, List<String>> aliases;
Expand All @@ -61,6 +67,7 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
private int cacheTimeout = EnvUtils.getPropertyAsInteger("solr.solrj.cache.timeout.sec", 5);

public void init(List<String> solrUrls) throws Exception {
this.initialNodes = getNodeNamesFromSolrUrls(solrUrls);
Copy link
Contributor Author

@mlbiscoc mlbiscoc Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of this JIRA issue is that we'll take the Solr URLs as configured and use this is the initial / backup liveNodes. I think this is a very simple idea to to document/understand/implement.

That makes sense. This leaves me with a few questions then. Shouldn't this take a list of URL or URI java object to verify actual non malformed URLs instead of a list of strings? I created the functions below to convert these Strings into cluster state nodeNames for liveNodes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea to do basic URL malformed checks like this

for (String solrUrl : solrUrls) {
urlScheme = solrUrl.startsWith("https") ? "https" : "http";
try (SolrClient initialClient = getSolrClient(solrUrl)) {
Expand Down Expand Up @@ -136,7 +143,7 @@ private ClusterState fetchClusterState(SolrClient client)

List<String> liveNodesList = (List<String>) cluster.get("live_nodes");
if (liveNodesList != null) {
this.liveNodes = Set.copyOf(liveNodesList);
setLiveNodes(Set.copyOf(liveNodesList));
liveNodesTimestamp = System.nanoTime();
}

Expand Down Expand Up @@ -229,10 +236,9 @@ > getCacheTimeout()) {
for (String nodeName : liveNodes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we exhaust liveNodes, shouldn't we then try the initial configured nodes, and then only failing that, throw an exception?

String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
Set<String> liveNodes = fetchLiveNodes(client);
this.liveNodes = (liveNodes);
setLiveNodes(fetchLiveNodes(client));
liveNodesTimestamp = System.nanoTime();
return liveNodes;
return this.liveNodes;
} catch (Exception e) {
log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
}
Expand All @@ -246,7 +252,7 @@ > getCacheTimeout()) {
+ " you could try re-creating a new CloudSolrClient using working"
+ " solrUrl(s) or zkHost(s).");
} else {
return liveNodes; // cached copy is fresh enough
return this.liveNodes; // cached copy is fresh enough
}
}

Expand Down Expand Up @@ -358,7 +364,6 @@ public ClusterState getClusterState() {
@SuppressWarnings("unchecked")
@Override
public Map<String, Object> getClusterProperties() {
// Map<String, Object> clusterPropertiesMap = new HashMap<>();
for (String nodeName : liveNodes) {
String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
try (SolrClient client = getSolrClient(baseUrl)) {
Expand Down Expand Up @@ -413,6 +418,29 @@ public String getQuorumHosts() {
return String.join(",", this.liveNodes);
}

/** Live nodes should always have the latest set of live nodes but never remove initial set */
private void setLiveNodes(Set<String> nodes) {
Set<String> liveNodes = new HashSet<>(nodes);
liveNodes.addAll(this.initialNodes);
this.liveNodes = Set.copyOf(liveNodes);
Comment on lines +423 to +425
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this 3 lines of extra copy-ing instead of nothing more than: this.liveNodes = Set.copyOf(nodes); ? That is, why are we touching / using initialNodes at all here?
Maybe an IllegalArgumentException if nodes.isEmpty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to not do another loop through if liveNodes was exhausted. initalNodes would always exist in liveNodes with this set method. Let me refactor again from your suggestion comment

}

public Set<String> getNodeNamesFromSolrUrls(List<String> urls)
throws URISyntaxException, MalformedURLException {
Set<String> urlSet = new HashSet<>();
for (String url : urls) {
urlSet.add(getNodeNameFromSolrUrl(url));
}
return Collections.unmodifiableSet(urlSet);
}

/** URL to cluster state node name (http://127.0.0.1:12345/solr to 127.0.0.1:12345_solr) */
public String getNodeNameFromSolrUrl(String solrUrl)
throws MalformedURLException, URISyntaxException {
URL url = new URI(solrUrl).toURL();
return url.getAuthority() + url.getPath().replace('/', '_');
}
Comment on lines +437 to +442
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you find other code in Solr doing this? Surely it's somewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a static method that with a unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm maybe... I found getBaseUrlForNodeName but it's going the other way nodeName->url. Let me look around more


private enum ClusterStateRequestType {
FETCH_LIVE_NODES,
FETCH_CLUSTER_PROP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
Expand All @@ -35,14 +36,15 @@
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.util.NamedList;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;

public class ClusterStateProviderTest extends SolrCloudTestCase {

@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
configureCluster(2)
.addConfig(
"conf",
getFile("solrj")
Expand All @@ -51,6 +53,15 @@ public static void setupCluster() throws Exception {
.resolve("streaming")
.resolve("conf"))
.configure();
cluster.waitForAllNodes(30);
System.setProperty("solr.solrj.cache.timeout.sec", "1");
}

@After
public void cleanup() throws Exception {
while (cluster.getJettySolrRunners().size() < 2) {
cluster.startJettySolrRunner();
}
}

@ParametersFactory
Expand All @@ -59,10 +70,13 @@ public static Iterable<String[]> parameters() throws NoSuchMethodException {
new String[] {"http2ClusterStateProvider"}, new String[] {"zkClientClusterStateProvider"});
}

private static ClusterStateProvider http2ClusterStateProvider() {
private static Http2ClusterStateProvider http2ClusterStateProvider() {
try {
return new Http2ClusterStateProvider(
List.of(cluster.getJettySolrRunner(0).getBaseUrl().toString()), null);
List.of(
cluster.getJettySolrRunner(0).getBaseUrl().toString(),
cluster.getJettySolrRunner(1).getBaseUrl().toString()),
null);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -160,7 +174,6 @@ public void testClusterStateProvider() throws SolrServerException, IOException {

try (var cspZk = zkClientClusterStateProvider();
var cspHttp = http2ClusterStateProvider()) {

assertThat(cspZk.getClusterProperties(), Matchers.hasEntry("ext.foo", "bar"));
assertThat(
cspZk.getClusterProperties().entrySet(),
Expand All @@ -183,4 +196,62 @@ public void testClusterStateProvider() throws SolrServerException, IOException {
clusterStateZk.getCollection("col2"), equalTo(clusterStateHttp.getCollection("col2")));
}
}

@Test
public void testClusterStateProviderDownedInitialLiveNodes() throws Exception {
try (var cspHttp = http2ClusterStateProvider()) {
var jettyNode1 = cluster.getJettySolrRunner(0);
var jettyNode2 = cluster.getJettySolrRunner(1);

Set<String> actualLiveNodes = cspHttp.getLiveNodes();
assertEquals(2, actualLiveNodes.size());

cluster.stopJettySolrRunner(jettyNode1);
waitForCSPCacheTimeout();

actualLiveNodes = cspHttp.getLiveNodes();
assertEquals(2, actualLiveNodes.size());

cluster.startJettySolrRunner(jettyNode1);
cluster.stopJettySolrRunner(jettyNode2);
waitForCSPCacheTimeout();

// Should still be reachable because live nodes doesn't remove initial nodes
actualLiveNodes = cspHttp.getLiveNodes();
assertEquals(2, actualLiveNodes.size());
}
}

@Test
public void testClusterStateProviderLiveNodesWithNewHost() throws Exception {
try (var cspHttp = http2ClusterStateProvider()) {
var jettyNode1 = cluster.getJettySolrRunner(0);
var jettyNode2 = cluster.getJettySolrRunner(1);
var jettyNode3 = cluster.startJettySolrRunner();

String nodeName1 = cspHttp.getNodeNameFromSolrUrl(jettyNode1.getBaseUrl().toString());
String nodeName2 = cspHttp.getNodeNameFromSolrUrl(jettyNode2.getBaseUrl().toString());
String nodeName3 = cspHttp.getNodeNameFromSolrUrl(jettyNode3.getBaseUrl().toString());
Set<String> expectedKnownNodes = Set.of(nodeName1, nodeName2, nodeName3);
waitForCSPCacheTimeout();

Set<String> actualKnownNodes = cspHttp.getLiveNodes();
assertEquals(3, actualKnownNodes.size());
assertEquals(expectedKnownNodes, actualKnownNodes);

// Stop non initially passed node from the cluster
cluster.stopJettySolrRunner(jettyNode3);
expectedKnownNodes = Set.of(nodeName1, nodeName2);
waitForCSPCacheTimeout();

// New nodes are removable from known hosts
actualKnownNodes = cspHttp.getLiveNodes();
assertEquals(2, actualKnownNodes.size());
assertEquals(expectedKnownNodes, actualKnownNodes);
}
}

private void waitForCSPCacheTimeout() throws InterruptedException {
Thread.sleep(2000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,14 @@ public JettySolrRunner stopJettySolrRunner(int index) throws Exception {
}

/**
* Add a previously stopped node back to the cluster
* Add a previously stopped node back to the cluster and reuse its port
*
* @param jetty a {@link JettySolrRunner} previously returned by {@link #stopJettySolrRunner(int)}
* @return the started node
* @throws Exception on error
*/
public JettySolrRunner startJettySolrRunner(JettySolrRunner jetty) throws Exception {
jetty.start(false);
jetty.start();
if (!jettys.contains(jetty)) jettys.add(jetty);
return jetty;
}
Expand Down
Loading