Skip to content

Commit

Permalink
Revert "GEODE-9484: Improve sending message to multy destinations (#7381
Browse files Browse the repository at this point in the history
)" (#7655)

This reverts commit 62cd12c.
  • Loading branch information
mivanac authored May 5, 2022
1 parent 08b4a7b commit 0ad4c8a
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
Expand Down Expand Up @@ -51,8 +50,6 @@
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
Expand All @@ -71,76 +68,45 @@
* the same across servers
*/
@Category({ClientSubscriptionTest.class})
public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase {
public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {

private static final String REGION_NAME = "UpdatePropagationDUnitTest_region";

private VM server1 = null;
private VM server2 = null;
private VM server3 = null;
private VM client1 = null;
private VM client2 = null;

private int PORT1;
private int PORT2;
private int PORT3;

private final int minNumEntries = 2;

private String hostnameServer1;
private String hostnameServer3;

@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();

final Host host = Host.getHost(0);

// Server1 VM
server1 = host.getVM(0);

// Server2 VM
server2 = host.getVM(1);

server3 = host.getVM(2);

client1 = host.getVM(3);

client2 = host.getVM(4);
// Client 1 VM
client1 = host.getVM(2);

PORT1 = server1.invoke(() -> createServerCache());
PORT2 = server2.invoke(() -> createServerCache());
PORT3 = server3.invoke(() -> createServerCache());
// client 2 VM
client2 = host.getVM(3);

hostnameServer1 = NetworkUtils.getServerHostName(server1.getHost());
hostnameServer3 = NetworkUtils.getServerHostName(server3.getHost());

IgnoredException.addIgnoredException("java.net.SocketException");
IgnoredException.addIgnoredException("Unexpected IOException");
}
PORT1 = server1.invoke(this::createServerCache);
PORT2 = server2.invoke(this::createServerCache);



@Test
public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception {
client1.invoke(
() -> createClientCache(hostnameServer1, PORT1));
() -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
client2.invoke(
() -> createClientCache(hostnameServer3, PORT3));
int entries = 20;
AsyncInvocation invocation = client1.invokeAsync(() -> doPuts(entries));

// Wait for some entries to be put
server1.invoke(this::verifyMinEntriesInserted);
() -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));

// Simulate crash
server2.invoke(() -> {
MembershipManagerHelper.crashDistributedSystem(getSystemStatic());
});

invocation.await();

int notNullEntriesIn1 = client1.invoke(() -> getNotNullEntriesNumber(entries));
int notNullEntriesIn3 = client2.invoke(() -> getNotNullEntriesNumber(entries));
assertThat(notNullEntriesIn3).isEqualTo(notNullEntriesIn1);
IgnoredException.addIgnoredException("java.net.SocketException");
IgnoredException.addIgnoredException("Unexpected IOException");
}

/**
Expand All @@ -149,11 +115,6 @@ public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception {
*/
@Test
public void updatesAreProgegatedAfterFailover() {
client1.invoke(
() -> createClientCache(hostnameServer1, PORT1, PORT2));
client2.invoke(
() -> createClientCache(hostnameServer1, PORT1, PORT2));

// First create entries on both servers via the two client
client1.invoke(this::createEntriesK1andK2);
client2.invoke(this::createEntriesK1andK2);
Expand Down Expand Up @@ -287,18 +248,6 @@ private void createClientCache(String host, Integer port1, Integer port2) throws
.addCacheListener(new EventTrackingCacheListener()).create(REGION_NAME);
}

private void createClientCache(String host, Integer port1) {
Properties props = new Properties();
props.setProperty(LOCATORS, "");
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer(host, port1).setPoolSubscriptionEnabled(false)
.setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000)
.setPoolReadTimeout(100).setPoolPingInterval(300);
ClientCache cache = getClientCache(cf);
cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
.create(REGION_NAME);
}

private Integer createServerCache() throws Exception {
Cache cache = getCache();
RegionAttributes attrs = createCacheServerAttributes();
Expand All @@ -309,7 +258,7 @@ private Integer createServerCache() throws Exception {
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
return server.getPort();
}

protected RegionAttributes createCacheServerAttributes() {
Expand Down Expand Up @@ -356,36 +305,6 @@ private void verifyUpdates() {
});
}

private void verifyMinEntriesInserted() {
await().untilAsserted(() -> assertThat(getCache().getRegion(SEPARATOR + REGION_NAME))
.hasSizeGreaterThan(minNumEntries));
}

private void doPuts(int entries) throws Exception {
Region<String, String> r1 = getCache().getRegion(REGION_NAME);
assertThat(r1).isNotNull();
for (int i = 0; i < entries; i++) {
try {
r1.put("" + i, "" + i);
} catch (Exception e) {
}
Thread.sleep(1000);
}
}

private int getNotNullEntriesNumber(int entries) {
int notNullEntries = 0;
Region<String, String> r1 = getCache().getRegion(SEPARATOR + REGION_NAME);
assertThat(r1).isNotNull();
for (int i = 0; i < entries; i++) {
Object value = r1.get("" + i, "" + i);
if (value != null) {
notNullEntries++;
}
}
return notNullEntries;
}

private static class EventTrackingCacheListener extends CacheListenerAdapter {

List<EntryEvent> receivedEvents = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* subclass of UpdatePropagationDUnitTest to exercise partitioned regions
*/
public class UpdatePropagationPRDistributedTest extends UpdatePropagationDistributedTest {
public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest {

@Override
protected RegionAttributes createCacheServerAttributes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void sharedSenderShouldRecoverFromClosedSocket() {
InternalDistributedSystem distributedSystem = getCache().getInternalDistributedSystem();
InternalDistributedMember otherMember = distributedSystem.getDistributionManager()
.getOtherNormalDistributionManagerIds().iterator().next();
Connection connection = conTable.getConduit().getConnection(otherMember, true,
Connection connection = conTable.getConduit().getConnection(otherMember, true, false,
System.currentTimeMillis(), 15000, 0);
await().untilAsserted(() -> {
// grab the shared, ordered "sender" connection to vm0. It should have a residual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void basicAcceptConnection() throws Exception {
assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();

Connection sharedUnordered = connectionTable.get(otherMember, false,
System.currentTimeMillis(), 15000, 0, false);
System.currentTimeMillis(), 15000, 0);
sharedUnordered.requestClose("for testing");
// the sender connection has been closed so we should only have 2 senders now
assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,27 +281,18 @@ private int sendToMany(final Membership mgr,
directReply = false;
}
if (ce != null) {

if (!retry) {
retryInfo = ce;
if (failedCe != null) {
failedCe.getMembers().addAll(ce.getMembers());
failedCe.getCauses().addAll(ce.getCauses());
} else {

if (failedCe != null) {
failedCe.getMembers().addAll(ce.getMembers());
failedCe.getCauses().addAll(ce.getCauses());
} else {
failedCe = ce;
}
failedCe = ce;
}
ce = null;
}
if (cons.isEmpty()) {
if (failedCe != null) {
throw failedCe;
}
if (retryInfo != null) {
continue;
}
return bytesWritten;
}

Expand Down Expand Up @@ -347,12 +338,7 @@ private int sendToMany(final Membership mgr,
}

if (ce != null) {
if (retryInfo != null) {
retryInfo.getMembers().addAll(ce.getMembers());
retryInfo.getCauses().addAll(ce.getCauses());
} else {
retryInfo = ce;
}
retryInfo = ce;
ce = null;
}

Expand Down Expand Up @@ -437,13 +423,13 @@ private ConnectExceptions readAcks(List sentCons, long startTime, long ackTimeou
* @param retry whether this is a retransmission
* @param ackTimeout the ack warning timeout
* @param ackSDTimeout the ack severe alert timeout
* @param connectionsList a list to hold the connections
* @param cons a list to hold the connections
* @return null if everything went okay, or a ConnectExceptions object if some connections
* couldn't be obtained
*/
private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg,
InternalDistributedMember[] destinations, boolean preserveOrder, boolean retry,
long ackTimeout, long ackSDTimeout, List<Connection> connectionsList) {
long ackTimeout, long ackSDTimeout, List cons) {
ConnectExceptions ce = null;
for (InternalDistributedMember destination : destinations) {
if (destination == null) {
Expand Down Expand Up @@ -472,18 +458,12 @@ private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg
if (ackTimeout > 0) {
startTime = System.currentTimeMillis();
}
final Connection connection;
if (!retry) {
connection = conduit.getFirstScanForConnection(destination, preserveOrder, startTime,
ackTimeout, ackSDTimeout);
} else {
connection = conduit.getConnection(destination, preserveOrder, startTime,
ackTimeout, ackSDTimeout);
}
Connection con = conduit.getConnection(destination, preserveOrder, retry, startTime,
ackTimeout, ackSDTimeout);

connection.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
connectionsList.add(connection);
if (connection.isSharedResource() && msg instanceof DirectReplyMessage) {
con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
cons.add(con);
if (con.isSharedResource() && msg instanceof DirectReplyMessage) {
DirectReplyMessage directMessage = (DirectReplyMessage) msg;
directMessage.registerProcessor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ static Connection createSender(final Membership<InternalDistributedMember> mgr,
final ConnectionTable t,
final boolean preserveOrder, final InternalDistributedMember remoteAddr,
final boolean sharedResource,
final long startTime, final long ackTimeout, final long ackSATimeout, boolean doNotRetry)
final long startTime, final long ackTimeout, final long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
boolean success = false;
Connection conn = null;
Expand Down Expand Up @@ -1021,9 +1021,7 @@ static Connection createSender(final Membership<InternalDistributedMember> mgr,
// do not change the text of this exception - it is looked for in exception handlers
throw new IOException("Cannot form connection to alert listener " + remoteAddr);
}
if (doNotRetry) {
throw new IOException("Connection not created in first try to " + remoteAddr);
}

// Wait briefly...
interrupted = Thread.interrupted() || interrupted;
try {
Expand Down
Loading

0 comments on commit 0ad4c8a

Please sign in to comment.