Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1092,12 +1092,15 @@ private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest requ
}
return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request);
}

ReadRequests getReadRequests() {
return getState().getReadRequests();
ReadException newReadException(String op, long installSnapshot, boolean started) {
return new ReadException(getMemberId() + ": Failed to " + op + " readIndex as snapshot (" + installSnapshot
+ ") installation is " + (started ? "started" : "in progress"));
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequest clientRequest) {
final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
if (installSnapshot != RaftLog.INVALID_LOG_INDEX) {
return JavaUtils.completeExceptionally(newReadException("get", installSnapshot, false));
}
final RaftPeerId leaderId = getInfo().getLeaderId();
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
Expand All @@ -1109,11 +1112,9 @@ private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequ
return JavaUtils.completeExceptionally(e);
}
}

private CompletableFuture<Long> getReadIndex(RaftClientRequest request, LeaderStateImpl leader) {
return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
}

private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
Expand All @@ -1123,12 +1124,6 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
}
return queryStateMachine(request);
} else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE){
/*
Linearizable read using ReadIndex. See Raft paper section 6.4.
1. First obtain readIndex from Leader.
2. Then waits for statemachine to advance at least as far as readIndex.
3. Finally, query the statemachine and return the result.
*/
final LeaderStateImpl leader = role.getLeaderState().orElse(null);

final CompletableFuture<Long> replyFuture;
Expand All @@ -1146,14 +1141,19 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
}

return replyFuture
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(this::waitReadIndex)
.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
} else {
throw new IllegalStateException("Unexpected read option: " + readOption);
}
}

private CompletableFuture<Long> waitReadIndex(long readIndex) {
final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
return installSnapshot != RaftLog.INVALID_LOG_INDEX
? JavaUtils.completeExceptionally(newReadException("start waiting for", installSnapshot, false))
: getState().getReadRequests().waitToAdvance(readIndex);
}
private RaftClientReply readException2Reply(RaftClientRequest request, Throwable e) {
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof StateMachineException ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
Expand All @@ -43,7 +44,7 @@ static class ReadIndexQueue {
* Map : readIndex -> appliedIndexFuture (when completes, readIndex <= appliedIndex).
* Invariant: all keys > lastAppliedIndex.
*/
private final NavigableMap<Long, CompletableFuture<Long>> sorted = new TreeMap<>();
private NavigableMap<Long, CompletableFuture<Long>> sorted = new TreeMap<>();

private final TimeDuration readTimeout;

Expand Down Expand Up @@ -88,6 +89,11 @@ private void handleTimeout(long readIndex) {
removed.completeExceptionally(new ReadException("Read timeout " + readTimeout + " for index " + readIndex));
}

synchronized Collection<CompletableFuture<Long>> clear() {
final Collection<CompletableFuture<Long>> futures = sorted.values();
sorted = new TreeMap<>();
return futures;
}

/** Complete all the entries less than or equal to the given applied index. */
synchronized void complete(long appliedIndex) {
Expand Down Expand Up @@ -122,4 +128,10 @@ LongConsumer getAppliedIndexConsumer() {
CompletableFuture<Long> waitToAdvance(long readIndex) {
return readIndexQueue.add(readIndex);
}

void fail(Throwable cause) {
for (CompletableFuture<Long> f : readIndexQueue.clear()) {
f.completeExceptionally(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstall
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return future.thenApply(dummy -> reply);
}
server.getState().getReadRequests().fail(server.newReadException("wait for", firstAvailableLogIndex, true));

final RaftPeerProto leaderProto;
if (!request.hasLastRaftConfigurationLogEntryProto()) {
Expand Down Expand Up @@ -401,4 +402,4 @@ private RoleInfoProto getRoleInfoProto(RaftPeerProto leader) {
.setFollowerInfo(followerInfo)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C cluste
}
}

@Test
public void testFollowerLinearizableReadFailsWhenInstallingSnapshot() throws Exception {
runWithNewCluster(ReadOnlyRequestTests::runTestFollowerLinearizableReadFailsWhenInstallingSnapshot);
}

@Test
public void testFollowerLinearizableReadParallel() throws Exception {
runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel);
Expand Down Expand Up @@ -285,4 +290,4 @@ static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) throws
assertReplyAtLeast(2, asyncReply.join());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLong;

public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
Expand Down Expand Up @@ -139,6 +144,76 @@ static <C extends MiniRaftCluster> void runTestReadOnlyRetryWhenLeaderDown(Retry
}
}

private static void setInProgressInstallSnapshotIndex(RaftServer.Division server, long index) throws Exception {
final Field snapshotInstallationHandler = server.getClass().getDeclaredField("snapshotInstallationHandler");
snapshotInstallationHandler.setAccessible(true);
final Object handler = snapshotInstallationHandler.get(server);
final Field inProgressInstallSnapshotIndex = handler.getClass()
.getDeclaredField("inProgressInstallSnapshotIndex");
inProgressInstallSnapshotIndex.setAccessible(true);
((AtomicLong) inProgressInstallSnapshotIndex.get(handler)).set(index);
}

private static void startSnapshotInstallation(RaftServer.Division server, long index) throws Exception {
setInProgressInstallSnapshotIndex(server, index);
final Method getState = server.getClass().getDeclaredMethod("getState");
getState.setAccessible(true);
final Object state = getState.invoke(server);
final Method getReadRequests = state.getClass().getDeclaredMethod("getReadRequests");
getReadRequests.setAccessible(true);
final Object readRequests = getReadRequests.invoke(state);
final Method fail = readRequests.getClass().getDeclaredMethod("fail", Throwable.class);
fail.setAccessible(true);
fail.invoke(readRequests, new ReadException(server.getMemberId()
+ ": Failed to wait for readIndex as snapshot (" + index + ") installation is started"));
}

static void assertSnapshotInstallationReadException(Throwable exception) {
final Throwable cause = exception instanceof CompletionException && exception.getCause() != null
? exception.getCause() : exception;
Assertions.assertInstanceOf(ReadException.class, cause);
Assertions.assertTrue(cause.getMessage().contains("snapshot (1) installation is"),
() -> "Unexpected exception: " + exception);
}

static <C extends MiniRaftCluster> void runTestFollowerLinearizableReadFailsWhenInstallingSnapshot(C cluster)
throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();

final List<RaftServer.Division> followers = cluster.getFollowers();
Assertions.assertEquals(2, followers.size());

final RaftServer.Division follower = followers.get(0);
final RaftPeerId followerId = follower.getId();

try (RaftClient leaderClient = cluster.createClient(leaderId);
RaftClient followerClient = cluster.createClient(followerId, RetryPolicies.noRetry())) {
assertReplyExact(1, leaderClient.io().send(INCREMENT));
assertReplyExact(1, followerClient.io().sendReadOnly(QUERY, followerId));

final CompletableFuture<RaftClientReply> writeReply = leaderClient.async().send(WAIT_AND_INCREMENT);
Thread.sleep(100);
final CompletableFuture<RaftClientReply> pendingRead = followerClient.async().sendReadOnly(QUERY, followerId);
Assertions.assertFalse(pendingRead.isDone(), () -> "pendingRead=" + pendingRead);

startSnapshotInstallation(follower, 1);
try {
final CompletionException pendingException = Assertions.assertThrows(CompletionException.class,
pendingRead::join);
assertSnapshotInstallationReadException(pendingException);

final ReadException readException = Assertions.assertThrows(ReadException.class,
() -> followerClient.io().sendReadOnly(QUERY, followerId));
assertSnapshotInstallationReadException(readException);
} finally {
setInProgressInstallSnapshotIndex(follower, -1);
}

assertReplyExact(2, writeReply.join());
assertReplyExact(2, followerClient.io().sendReadOnly(QUERY, followerId));
}
}

static int retrieve(RaftClientReply reply) {
Assertions.assertTrue(reply.isSuccess());
return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
Expand Down