diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c0e93338a6..08e98ea116 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1092,12 +1092,15 @@ private CompletableFuture staleReadAsync(RaftClientRequest requ } return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request); } - - ReadRequests getReadRequests() { - return getState().getReadRequests(); + ReadException newReadException(String op, long installSnapshot, boolean started) { + return new ReadException(getMemberId() + ": Failed to " + op + " readIndex as snapshot (" + installSnapshot + + ") installation is " + (started ? "started" : "in progress")); } - private CompletableFuture sendReadIndexAsync(RaftClientRequest clientRequest) { + final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex(); + if (installSnapshot != RaftLog.INVALID_LOG_INDEX) { + return JavaUtils.completeExceptionally(newReadException("get", installSnapshot, false)); + } final RaftPeerId leaderId = getInfo().getLeaderId(); if (leaderId == null) { return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); @@ -1109,11 +1112,9 @@ private CompletableFuture sendReadIndexAsync(RaftClientRequ return JavaUtils.completeExceptionally(e); } } - private CompletableFuture getReadIndex(RaftClientRequest request, LeaderStateImpl leader) { return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex); } - private CompletableFuture readAsync(RaftClientRequest request) { if (request.getType().getRead().getPreferNonLinearizable() || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { @@ -1123,12 +1124,6 @@ private CompletableFuture readAsync(RaftClientRequest request) } return queryStateMachine(request); } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE){ - /* - Linearizable read using ReadIndex. See Raft paper section 6.4. - 1. First obtain readIndex from Leader. - 2. Then waits for statemachine to advance at least as far as readIndex. - 3. Finally, query the statemachine and return the result. - */ final LeaderStateImpl leader = role.getLeaderState().orElse(null); final CompletableFuture replyFuture; @@ -1146,14 +1141,19 @@ private CompletableFuture readAsync(RaftClientRequest request) } return replyFuture - .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex)) + .thenCompose(this::waitReadIndex) .thenCompose(readIndex -> queryStateMachine(request)) .exceptionally(e -> readException2Reply(request, e)); } else { throw new IllegalStateException("Unexpected read option: " + readOption); } } - + private CompletableFuture waitReadIndex(long readIndex) { + final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex(); + return installSnapshot != RaftLog.INVALID_LOG_INDEX + ? JavaUtils.completeExceptionally(newReadException("start waiting for", installSnapshot, false)) + : getState().getReadRequests().waitToAdvance(readIndex); + } private RaftClientReply readException2Reply(RaftClientRequest request, Throwable e) { e = JavaUtils.unwrapCompletionException(e); if (e instanceof StateMachineException ) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java index 6112a46009..4ffdbfa181 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; @@ -43,7 +44,7 @@ static class ReadIndexQueue { * Map : readIndex -> appliedIndexFuture (when completes, readIndex <= appliedIndex). * Invariant: all keys > lastAppliedIndex. */ - private final NavigableMap> sorted = new TreeMap<>(); + private NavigableMap> sorted = new TreeMap<>(); private final TimeDuration readTimeout; @@ -88,6 +89,11 @@ private void handleTimeout(long readIndex) { removed.completeExceptionally(new ReadException("Read timeout " + readTimeout + " for index " + readIndex)); } + synchronized Collection> clear() { + final Collection> futures = sorted.values(); + sorted = new TreeMap<>(); + return futures; + } /** Complete all the entries less than or equal to the given applied index. */ synchronized void complete(long appliedIndex) { @@ -122,4 +128,10 @@ LongConsumer getAppliedIndexConsumer() { CompletableFuture waitToAdvance(long readIndex) { return readIndexQueue.add(readIndex); } + + void fail(Throwable cause) { + for (CompletableFuture f : readIndexQueue.clear()) { + f.completeExceptionally(cause); + } + } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 46b6aaf87f..851e6a1a2e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -276,6 +276,7 @@ private CompletableFuture notifyStateMachineToInstall InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); return future.thenApply(dummy -> reply); } + server.getState().getReadRequests().fail(server.newReadException("wait for", firstAvailableLogIndex, true)); final RaftPeerProto leaderProto; if (!request.hasLastRaftConfigurationLogEntryProto()) { @@ -401,4 +402,4 @@ private RoleInfoProto getRoleInfoProto(RaftPeerProto leader) { .setFollowerInfo(followerInfo) .build(); } -} \ No newline at end of file +} diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 09781b546e..832457e9de 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -169,6 +169,11 @@ static void runTestFollowerLinearizableRead(C cluste } } + @Test + public void testFollowerLinearizableReadFailsWhenInstallingSnapshot() throws Exception { + runWithNewCluster(ReadOnlyRequestTests::runTestFollowerLinearizableReadFailsWhenInstallingSnapshot); + } + @Test public void testFollowerLinearizableReadParallel() throws Exception { runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); @@ -285,4 +290,4 @@ static void runTestReadAfterWrite(C cluster) throws assertReplyAtLeast(2, asyncReply.join()); } } -} \ No newline at end of file +} diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index 94e9433b15..df615bec13 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -40,8 +40,13 @@ import org.junit.jupiter.api.Test; import org.slf4j.event.Level; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong; public abstract class ReadOnlyRequestTests @@ -139,6 +144,76 @@ static void runTestReadOnlyRetryWhenLeaderDown(Retry } } + private static void setInProgressInstallSnapshotIndex(RaftServer.Division server, long index) throws Exception { + final Field snapshotInstallationHandler = server.getClass().getDeclaredField("snapshotInstallationHandler"); + snapshotInstallationHandler.setAccessible(true); + final Object handler = snapshotInstallationHandler.get(server); + final Field inProgressInstallSnapshotIndex = handler.getClass() + .getDeclaredField("inProgressInstallSnapshotIndex"); + inProgressInstallSnapshotIndex.setAccessible(true); + ((AtomicLong) inProgressInstallSnapshotIndex.get(handler)).set(index); + } + + private static void startSnapshotInstallation(RaftServer.Division server, long index) throws Exception { + setInProgressInstallSnapshotIndex(server, index); + final Method getState = server.getClass().getDeclaredMethod("getState"); + getState.setAccessible(true); + final Object state = getState.invoke(server); + final Method getReadRequests = state.getClass().getDeclaredMethod("getReadRequests"); + getReadRequests.setAccessible(true); + final Object readRequests = getReadRequests.invoke(state); + final Method fail = readRequests.getClass().getDeclaredMethod("fail", Throwable.class); + fail.setAccessible(true); + fail.invoke(readRequests, new ReadException(server.getMemberId() + + ": Failed to wait for readIndex as snapshot (" + index + ") installation is started")); + } + + static void assertSnapshotInstallationReadException(Throwable exception) { + final Throwable cause = exception instanceof CompletionException && exception.getCause() != null + ? exception.getCause() : exception; + Assertions.assertInstanceOf(ReadException.class, cause); + Assertions.assertTrue(cause.getMessage().contains("snapshot (1) installation is"), + () -> "Unexpected exception: " + exception); + } + + static void runTestFollowerLinearizableReadFailsWhenInstallingSnapshot(C cluster) + throws Exception { + final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); + + final List followers = cluster.getFollowers(); + Assertions.assertEquals(2, followers.size()); + + final RaftServer.Division follower = followers.get(0); + final RaftPeerId followerId = follower.getId(); + + try (RaftClient leaderClient = cluster.createClient(leaderId); + RaftClient followerClient = cluster.createClient(followerId, RetryPolicies.noRetry())) { + assertReplyExact(1, leaderClient.io().send(INCREMENT)); + assertReplyExact(1, followerClient.io().sendReadOnly(QUERY, followerId)); + + final CompletableFuture writeReply = leaderClient.async().send(WAIT_AND_INCREMENT); + Thread.sleep(100); + final CompletableFuture pendingRead = followerClient.async().sendReadOnly(QUERY, followerId); + Assertions.assertFalse(pendingRead.isDone(), () -> "pendingRead=" + pendingRead); + + startSnapshotInstallation(follower, 1); + try { + final CompletionException pendingException = Assertions.assertThrows(CompletionException.class, + pendingRead::join); + assertSnapshotInstallationReadException(pendingException); + + final ReadException readException = Assertions.assertThrows(ReadException.class, + () -> followerClient.io().sendReadOnly(QUERY, followerId)); + assertSnapshotInstallationReadException(readException); + } finally { + setInProgressInstallSnapshotIndex(follower, -1); + } + + assertReplyExact(2, writeReply.join()); + assertReplyExact(2, followerClient.io().sendReadOnly(QUERY, followerId)); + } + } + static int retrieve(RaftClientReply reply) { Assertions.assertTrue(reply.isSuccess()); return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));