Skip to content
Draft
25 changes: 25 additions & 0 deletions ratis-docs/src/site/markdown/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,31 @@ but there are tradeoffs (e.g. Write and Read performance) between different type
| **Type** | TimeDuration |
| **Default** | 10ms |

| **Property** | `raft.server.read.read-index.batch.enabled` |
|:----------------|:----------------------------------------------------------------------------------|
| **Description** | whether to batch follower-to-leader ReadIndex RPCs for plain linearizable reads |
| **Type** | boolean |
| **Default** | false |

| **Property** | `raft.server.read.read-index.batch.interval` |
|:----------------|:----------------------------------------------------|
| **Description** | positive maximum time to collect reads into one ReadIndex batch |
| **Type** | TimeDuration |
| **Default** | 500us |

| **Property** | `raft.server.read.read-index.batch.size` |
|:----------------|:----------------------------------------------------|
| **Description** | maximum number of reads in one ReadIndex batch |
| **Type** | int |
| **Default** | 64 |

When ReadIndex batching is enabled, a follower batches plain linearizable read
requests and sends a single ReadIndex request for each sealed batch. A batch is
sealed when either `batch.interval` expires or `batch.size` is reached. New
reads after sealing are assigned to a later batch. Read-after-write requests
bypass batching so that the leader can evaluate each request's client-specific
write index.

| **Property** | `raft.server.read.leader.heartbeat-check.enabled` |
|:----------------|:--------------------------------------------------|
| **Description** | whether to check heartbeat for read index. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,38 @@ static TimeDuration repliedIndexBatchInterval(RaftProperties properties) {
static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) {
setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval);
}

interface Batch {
String PREFIX = ReadIndex.PREFIX + ".batch";

String ENABLED_KEY = PREFIX + ".enabled";
boolean ENABLED_DEFAULT = false;
static boolean enabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, ENABLED_KEY, ENABLED_DEFAULT, getDefaultLog());
}
static void setEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, ENABLED_KEY, enabled);
}

String BATCH_INTERVAL_KEY = PREFIX + ".interval";
TimeDuration BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(500, TimeUnit.MICROSECONDS);
static TimeDuration batchInterval(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(BATCH_INTERVAL_DEFAULT.getUnit()),
BATCH_INTERVAL_KEY, BATCH_INTERVAL_DEFAULT, getDefaultLog(), requirePositive());
}
static void setBatchInterval(RaftProperties properties, TimeDuration interval) {
setTimeDuration(properties::setTimeDuration, BATCH_INTERVAL_KEY, interval, requirePositive());
}

String BATCH_SIZE_KEY = PREFIX + ".size";
int BATCH_SIZE_DEFAULT = 64;
static int batchSize(RaftProperties properties) {
return getInt(properties::getInt, BATCH_SIZE_KEY, BATCH_SIZE_DEFAULT, getDefaultLog(), requireMin(1));
}
static void setBatchSize(RaftProperties properties, int batchSize) {
setInt(properties::setInt, BATCH_SIZE_KEY, batchSize, requireMin(1));
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class RaftServerImpl implements RaftServer.Division,
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction";
static final String READ_INDEX = CLASS_NAME + ".readIndexAsync";
static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection";
static final String START_COMPLETE = CLASS_NAME + ".startComplete";
Expand Down Expand Up @@ -241,6 +242,7 @@ public long[] getFollowerMatchIndices() {
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final WriteIndexCache writeIndexCache;
private final NavigableIndices appendLogTermIndices;
private final ReadIndexBatching readIndexBatching;

private final RaftServerJmxAdapter jmxAdapter = new RaftServerJmxAdapter(this);
private final LeaderElectionMetrics leaderElectionMetrics;
Expand Down Expand Up @@ -279,6 +281,11 @@ public long[] getFollowerMatchIndices() {
this.dataStreamMap = new DataStreamMapImpl(id);
this.readOption = RaftServerConfigKeys.Read.option(properties);
this.writeIndexCache = new WriteIndexCache(properties);
this.readIndexBatching = RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(properties) ?
new ReadIndexBatching(
RaftServerConfigKeys.Read.ReadIndex.Batch.batchInterval(properties),
RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties),
this::sendReadIndexAsyncImpl) : null;
this.transactionManager = new TransactionManager(id);
TraceUtils.setTracerWhenEnabled(properties);

Expand Down Expand Up @@ -522,6 +529,11 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
public void close() {
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: shutdown", getMemberId());
try {
Optional.ofNullable(readIndexBatching).ifPresent(ReadIndexBatching::close);
} catch (Exception e) {
LOG.warn("{}: Failed to close ReadIndexBatching", getMemberId(), e);
}
try {
jmxAdapter.unregister();
} catch (Exception e) {
Expand Down Expand Up @@ -1091,6 +1103,20 @@ ReadRequests getReadRequests() {
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequest clientRequest) {
if (readIndexBatching != null
&& !clientRequest.getType().getRead().getReadAfterWriteConsistent()) {
return readIndexBatching.submit(clientRequest);
}
return sendReadIndexAsyncImpl(clientRequest);
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsyncImpl(RaftClientRequest clientRequest) {
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader != null) {
return getReadIndex(clientRequest, leader)
.thenApply(index -> toReadIndexReplyProto(getId(), getMemberId(), true, index));
}

final RaftPeerId leaderId = getInfo().getLeaderId();
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
Expand Down Expand Up @@ -1569,6 +1595,7 @@ public CompletableFuture<ReadIndexReplyProto> readIndexAsync(ReadIndexRequestPro
assertLifeCycleState(LifeCycle.States.RUNNING);

final RaftPeerId peerId = RaftPeerId.valueOf(request.getServerRequest().getRequestorId());
CodeInjectionForTesting.execute(READ_INDEX, getId(), peerId, request);

final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.TimeoutScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/** Batch follower-to-leader ReadIndex requests. */
class ReadIndexBatching {
private static final Logger LOG = LoggerFactory.getLogger(ReadIndexBatching.class);

private final TimeoutExecutor scheduler;
private final TimeDuration batchInterval;
private final int batchSize;
private final Function<RaftClientRequest, CompletableFuture<ReadIndexReplyProto>> readIndexAsyncImpl;

/** Guarded by {@code this}; the monitor provides visibility, so volatile is not needed. */
private Batch open;
/** Guarded by {@code this}. */
private boolean closed;

ReadIndexBatching(TimeDuration batchInterval, int batchSize,
Function<RaftClientRequest, CompletableFuture<ReadIndexReplyProto>> readIndexAsyncImpl) {
this(TimeoutScheduler.getInstance(), batchInterval, batchSize, readIndexAsyncImpl);
}

ReadIndexBatching(TimeoutExecutor scheduler, TimeDuration batchInterval, int batchSize,
Function<RaftClientRequest, CompletableFuture<ReadIndexReplyProto>> readIndexAsyncImpl) {
this.scheduler = scheduler;
this.batchInterval = batchInterval;
this.batchSize = batchSize;
this.readIndexAsyncImpl = readIndexAsyncImpl;
}

CompletableFuture<ReadIndexReplyProto> submit(RaftClientRequest request) {
final CompletableFuture<ReadIndexReplyProto> future = new CompletableFuture<>();
final Batch batch;
final boolean schedule;
final boolean seal;
synchronized (this) {
if (closed) {
return JavaUtils.completeExceptionally(newClosedException());
}
schedule = open == null;
if (schedule) {
open = new Batch();
}
batch = open;
batch.add(request, future);
seal = batch.size() >= batchSize;
if (seal) {
open = null;
}
}

if (schedule) {
scheduler.onTimeout(batchInterval, () -> seal(batch), LOG,
() -> "Failed to seal ReadIndex batch");
}
if (seal) {
batch.seal(readIndexAsyncImpl);
}
return future;
}

void close() {
final Batch batch;
synchronized (this) {
closed = true;
batch = open;
open = null;
}
if (batch != null) {
batch.cancel(newClosedException());
}
}

private static ReadIndexException newClosedException() {
return new ReadIndexException("ReadIndex batching is closed.");
}

private void seal(Batch batch) {
synchronized (this) {
if (open == batch) {
open = null;
}
}
batch.seal(readIndexAsyncImpl);
}

private static class Pending {
private final RaftClientRequest request;
private final CompletableFuture<ReadIndexReplyProto> future;

Pending(RaftClientRequest request, CompletableFuture<ReadIndexReplyProto> future) {
this.request = request;
this.future = future;
}
}

private static class Batch {
private final AtomicBoolean sealed = new AtomicBoolean();
/** Appended only while this batch is {@code open}; sealing first removes it from {@code open}. */
private final List<Pending> pending = new ArrayList<>();

void add(RaftClientRequest request, CompletableFuture<ReadIndexReplyProto> future) {
pending.add(new Pending(request, future));
}

int size() {
return pending.size();
}

void seal(Function<RaftClientRequest, CompletableFuture<ReadIndexReplyProto>> readIndexAsyncImpl) {
if (!sealed.compareAndSet(false, true)) {
return;
}
if (pending.isEmpty()) {
return;
}

final CompletableFuture<ReadIndexReplyProto> replyFuture;
try {
// Plain reads only need one ReadIndex RPC for the batch. Read-after-write requests
// bypass batching before reaching this class, since their client request carries
// per-client write-index state.
replyFuture = readIndexAsyncImpl.apply(pending.get(0).request);
} catch (Throwable t) {
completeExceptionally(t);
return;
}

replyFuture.whenComplete((reply, throwable) -> {
if (throwable != null) {
completeExceptionally(JavaUtils.unwrapCompletionException(throwable));
} else {
pending.forEach(p -> p.future.complete(reply));
}
});
}

private void cancel(Throwable throwable) {
if (sealed.compareAndSet(false, true)) {
completeExceptionally(throwable);
}
}

private void completeExceptionally(Throwable throwable) {
pending.forEach(p -> p.future.completeExceptionally(throwable));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ public abstract class LinearizableReadTests<CLUSTER extends MiniRaftCluster>

public abstract Type readIndexType();

public boolean readIndexBatchEnabled() {
return false;
}

public final void assertRaftProperties(RaftProperties p) {
assertOption(LINEARIZABLE, p);
assertEquals(isLeaderLeaseEnabled(), RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p));
assertEquals(readIndexBatchEnabled(), RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(p));
}

protected void runWithNewCluster(CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
Expand All @@ -88,6 +93,7 @@ public void setup() {
RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(p, readIndexBatchEnabled());

// Enable dummy request so linearizable-read tests exercise the default ordered-async bootstrap path.
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, true);
Expand Down Expand Up @@ -285,4 +291,4 @@ static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) throws
assertReplyAtLeast(2, asyncReply.join());
}
}
}
}
Loading