diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index f5189ed862..5272d2914b 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -253,6 +253,25 @@ but there are tradeoffs (e.g. Write and Read performance) between different type | **Type** | TimeDuration | | **Default** | 10ms | +| **Property** | `raft.server.read.read-index.batch.enabled` | +|:----------------|:----------------------------------------------------------------------------------| +| **Description** | whether to batch follower-to-leader ReadIndex RPCs for plain linearizable reads | +| **Type** | boolean | +| **Default** | false | + +| **Property** | `raft.server.read.read-index.batch.size` | +|:----------------|:----------------------------------------------------| +| **Description** | maximum number of reads in one opportunistic ReadIndex batch | +| **Type** | int | +| **Default** | 64 | + +When ReadIndex batching is enabled, a follower batches plain linearizable read +requests opportunistically and sends a single ReadIndex request for the reads +already queued when the batch is drained. `batch.size` is a maximum cap, not a +target size; the follower does not wait to fill a batch. Read-after-write +requests bypass batching so that the leader can evaluate each request's +client-specific write index. + | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------| | **Description** | whether to check heartbeat for read index. | diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 2d55594782..15043bfa76 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -309,6 +309,28 @@ static TimeDuration repliedIndexBatchInterval(RaftProperties properties) { static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) { setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval); } + + interface Batch { + String PREFIX = ReadIndex.PREFIX + ".batch"; + + String ENABLED_KEY = PREFIX + ".enabled"; + boolean ENABLED_DEFAULT = false; + static boolean enabled(RaftProperties properties) { + return getBoolean(properties::getBoolean, ENABLED_KEY, ENABLED_DEFAULT, getDefaultLog()); + } + static void setEnabled(RaftProperties properties, boolean enabled) { + setBoolean(properties::setBoolean, ENABLED_KEY, enabled); + } + + String BATCH_SIZE_KEY = PREFIX + ".size"; + int BATCH_SIZE_DEFAULT = 64; + static int batchSize(RaftProperties properties) { + return getInt(properties::getInt, BATCH_SIZE_KEY, BATCH_SIZE_DEFAULT, getDefaultLog(), requireMin(1)); + } + static void setBatchSize(RaftProperties properties, int batchSize) { + setInt(properties::setInt, BATCH_SIZE_KEY, batchSize, requireMin(1)); + } + } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 1c9cd3f658..a1da0bfcd8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -163,6 +163,7 @@ class RaftServerImpl implements RaftServer.Division, static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction"; + static final String READ_INDEX = CLASS_NAME + ".readIndexAsync"; static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete"; static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection"; static final String START_COMPLETE = CLASS_NAME + ".startComplete"; @@ -241,6 +242,7 @@ public long[] getFollowerMatchIndices() { private final CommitInfoCache commitInfoCache = new CommitInfoCache(); private final WriteIndexCache writeIndexCache; private final NavigableIndices appendLogTermIndices; + private final ReadIndexBatching readIndexBatching; private final RaftServerJmxAdapter jmxAdapter = new RaftServerJmxAdapter(this); private final LeaderElectionMetrics leaderElectionMetrics; @@ -301,6 +303,11 @@ public long[] getFollowerMatchIndices() { RaftServerConfigKeys.ThreadPool.clientCached(properties), RaftServerConfigKeys.ThreadPool.clientSize(properties), id + "-client"); + this.readIndexBatching = RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(properties) ? + new ReadIndexBatching( + serverExecutor, + RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties), + this::sendReadIndexAsyncImpl) : null; this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); } @@ -522,6 +529,11 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) { public void close() { lifeCycle.checkStateAndClose(() -> { LOG.info("{}: shutdown", getMemberId()); + try { + Optional.ofNullable(readIndexBatching).ifPresent(ReadIndexBatching::close); + } catch (Exception e) { + LOG.warn("{}: Failed to close ReadIndexBatching", getMemberId(), e); + } try { jmxAdapter.unregister(); } catch (Exception e) { @@ -1091,6 +1103,21 @@ ReadRequests getReadRequests() { } private CompletableFuture sendReadIndexAsync(RaftClientRequest clientRequest) { + if (readIndexBatching != null + && !role.getLeaderState().isPresent() + && !clientRequest.getType().getRead().getReadAfterWriteConsistent()) { + return readIndexBatching.submit(clientRequest); + } + return sendReadIndexAsyncImpl(clientRequest); + } + + private CompletableFuture sendReadIndexAsyncImpl(RaftClientRequest clientRequest) { + final LeaderStateImpl leader = role.getLeaderState().orElse(null); + if (leader != null) { + return getReadIndex(clientRequest, leader) + .thenApply(index -> toReadIndexReplyProto(getId(), getMemberId(), true, index)); + } + final RaftPeerId leaderId = getInfo().getLeaderId(); if (leaderId == null) { return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); @@ -1569,6 +1596,7 @@ public CompletableFuture readIndexAsync(ReadIndexRequestPro assertLifeCycleState(LifeCycle.States.RUNNING); final RaftPeerId peerId = RaftPeerId.valueOf(request.getServerRequest().getRequestorId()); + CodeInjectionForTesting.execute(READ_INDEX, getId(), peerId, request); final LeaderStateImpl leader = role.getLeaderState().orElse(null); if (leader == null) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java new file mode 100644 index 0000000000..7167c3ce5b --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexBatching.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.exceptions.ReadIndexException; +import org.apache.ratis.util.JavaUtils; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** + * Opportunistically batch follower-to-leader ReadIndex requests. + * + *

The batch is drained on the server executor without waiting for a timer. {@code batchSize} + * is only a maximum drain cap, not a target size. + */ +class ReadIndexBatching { + private final Executor executor; + private final int batchSize; + private final Function> readIndexAsyncImpl; + + /** Guarded by {@code this}. */ + private final Queue pending = new ArrayDeque<>(); + /** Guarded by {@code this}. */ + private final HashSet inFlight = new HashSet<>(); + /** Guarded by {@code this}; at most one drain task is scheduled or running. */ + private boolean drainScheduled; + /** Guarded by {@code this}. */ + private boolean closed; + + ReadIndexBatching(Executor executor, int batchSize, + Function> readIndexAsyncImpl) { + this.executor = executor; + this.batchSize = batchSize; + this.readIndexAsyncImpl = readIndexAsyncImpl; + } + + CompletableFuture submit(RaftClientRequest request) { + final CompletableFuture future = new CompletableFuture<>(); + final boolean schedule; + synchronized (this) { + if (closed) { + return JavaUtils.completeExceptionally(newClosedException()); + } + pending.add(new Pending(request, future)); + schedule = !drainScheduled; + if (schedule) { + drainScheduled = true; + } + } + + if (schedule) { + scheduleDrain(); + } + return future; + } + + void close() { + close(newClosedException()); + } + + private void close(Throwable throwable) { + final List queued; + final List running; + synchronized (this) { + if (closed) { + return; + } + closed = true; + drainScheduled = false; + queued = new ArrayList<>(pending); + pending.clear(); + running = new ArrayList<>(inFlight); + inFlight.clear(); + } + queued.forEach(p -> p.future.completeExceptionally(throwable)); + running.forEach(batch -> batch.completeExceptionally(throwable)); + } + + private void scheduleDrain() { + try { + executor.execute(this::drain); + } catch (RejectedExecutionException e) { + close(new ReadIndexException("Failed to schedule ReadIndex batch drain.", e)); + } + } + + private static ReadIndexException newClosedException() { + return new ReadIndexException("ReadIndex batching is closed."); + } + + private void drain() { + final Batch batch; + synchronized (this) { + if (closed || pending.isEmpty()) { + drainScheduled = false; + return; + } + batch = pollBatch(); + inFlight.add(batch); + } + + batch.send(readIndexAsyncImpl, () -> onBatchDone(batch)); + + final boolean scheduleNext; + synchronized (this) { + if (closed) { + scheduleNext = false; + } else if (pending.isEmpty()) { + drainScheduled = false; + scheduleNext = false; + } else { + scheduleNext = true; + } + } + if (scheduleNext) { + scheduleDrain(); + } + } + + private Batch pollBatch() { + final List batch = new ArrayList<>(Math.min(batchSize, pending.size())); + for (int i = 0; i < batchSize; i++) { + final Pending next = pending.poll(); + if (next == null) { + break; + } + batch.add(next); + } + return new Batch(batch); + } + + private void onBatchDone(Batch batch) { + synchronized (this) { + inFlight.remove(batch); + } + } + + private static class Pending { + private final RaftClientRequest request; + private final CompletableFuture future; + + Pending(RaftClientRequest request, CompletableFuture future) { + this.request = request; + this.future = future; + } + } + + private static class Batch { + private final AtomicBoolean completed = new AtomicBoolean(); + private final List pending; + + Batch(List pending) { + this.pending = pending; + } + + void send(Function> readIndexAsyncImpl, + Runnable onComplete) { + if (pending.isEmpty()) { + return; + } + if (completed.get()) { + return; + } + + final CompletableFuture replyFuture; + try { + // Plain reads only need one ReadIndex RPC for the batch. Read-after-write requests + // bypass batching before reaching this class, since their client request carries + // per-client write-index state. + replyFuture = readIndexAsyncImpl.apply(pending.get(0).request); + } catch (Throwable t) { + completeExceptionally(t); + onComplete.run(); + return; + } + + replyFuture.whenComplete((reply, throwable) -> { + try { + if (throwable != null) { + completeExceptionally(JavaUtils.unwrapCompletionException(throwable)); + } else { + complete(reply); + } + } finally { + onComplete.run(); + } + }); + } + + private void complete(ReadIndexReplyProto reply) { + if (completed.compareAndSet(false, true)) { + pending.forEach(p -> p.future.complete(reply)); + } + } + + private void completeExceptionally(Throwable throwable) { + if (completed.compareAndSet(false, true)) { + pending.forEach(p -> p.future.completeExceptionally(throwable)); + } + } + } +} diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 09781b546e..4afa75fa8b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -68,10 +68,15 @@ public abstract class LinearizableReadTests public abstract Type readIndexType(); + public boolean readIndexBatchEnabled() { + return false; + } + public final void assertRaftProperties(RaftProperties p) { assertOption(LINEARIZABLE, p); assertEquals(isLeaderLeaseEnabled(), RaftServerConfigKeys.Read.leaderLeaseEnabled(p)); assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p)); + assertEquals(readIndexBatchEnabled(), RaftServerConfigKeys.Read.ReadIndex.Batch.enabled(p)); } protected void runWithNewCluster(CheckedConsumer testCase) throws Exception { @@ -88,6 +93,7 @@ public void setup() { RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); + RaftServerConfigKeys.Read.ReadIndex.Batch.setEnabled(p, readIndexBatchEnabled()); // Enable dummy request so linearizable-read tests exercise the default ordered-async bootstrap path. RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, true); @@ -285,4 +291,4 @@ static void runTestReadAfterWrite(C cluster) throws assertReplyAtLeast(2, asyncReply.join()); } } -} \ No newline at end of file +} diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java new file mode 100644 index 0000000000..8ccdf4e92c --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestReadIndexBatching.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; +import org.apache.ratis.protocol.exceptions.ReadIndexException; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +class TestReadIndexBatching { + @Test + void testBatchSizeMustBePositive() { + final RaftProperties properties = new RaftProperties(); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> RaftServerConfigKeys.Read.ReadIndex.Batch.setBatchSize(properties, 0)); + + properties.setInt(RaftServerConfigKeys.Read.ReadIndex.Batch.BATCH_SIZE_KEY, 0); + Assertions.assertThrows(IllegalArgumentException.class, + () -> RaftServerConfigKeys.Read.ReadIndex.Batch.batchSize(properties)); + } + + @Test + void testSubmitSchedulesOneOpportunisticDrain() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); + final AtomicInteger readIndexCount = new AtomicInteger(); + final ReadIndexReplyProto readIndexReply = ReadIndexReplyProto.getDefaultInstance(); + final ReadIndexBatching batching = new ReadIndexBatching( + executor, 64, request -> { + readIndexCount.incrementAndGet(); + return CompletableFuture.completedFuture(readIndexReply); + }); + + final CompletableFuture first = batching.submit(null); + Assertions.assertFalse(first.isDone()); + Assertions.assertEquals(1, executor.getTaskCount()); + + final CompletableFuture second = batching.submit(null); + Assertions.assertFalse(second.isDone()); + Assertions.assertEquals(1, executor.getTaskCount()); + + executor.runNext(); + Assertions.assertEquals(1, readIndexCount.get()); + Assertions.assertSame(readIndexReply, first.get()); + Assertions.assertSame(readIndexReply, second.get()); + } + + @Test + void testBatchSizeCapsEachDrain() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); + final AtomicInteger readIndexCount = new AtomicInteger(); + final ReadIndexReplyProto readIndexReply = ReadIndexReplyProto.getDefaultInstance(); + final ReadIndexBatching batching = new ReadIndexBatching( + executor, 2, request -> { + readIndexCount.incrementAndGet(); + return CompletableFuture.completedFuture(readIndexReply); + }); + + final CompletableFuture first = batching.submit(null); + final CompletableFuture second = batching.submit(null); + final CompletableFuture third = batching.submit(null); + + executor.runNext(); + Assertions.assertEquals(1, readIndexCount.get()); + Assertions.assertSame(readIndexReply, first.get()); + Assertions.assertSame(readIndexReply, second.get()); + Assertions.assertFalse(third.isDone()); + Assertions.assertEquals(1, executor.getTaskCount()); + + executor.runNext(); + Assertions.assertEquals(2, readIndexCount.get()); + Assertions.assertSame(readIndexReply, third.get()); + } + + @Test + void testReadIndexFailureCompletesBatchFuturesExceptionally() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); + final RuntimeException failure = new RuntimeException("read index failed"); + final CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(failure); + final ReadIndexBatching batching = new ReadIndexBatching(executor, 64, request -> failed); + + final CompletableFuture first = batching.submit(null); + final CompletableFuture second = batching.submit(null); + + executor.runNext(); + Assertions.assertSame(failure, + Assertions.assertThrows(ExecutionException.class, first::get).getCause()); + Assertions.assertSame(failure, + Assertions.assertThrows(ExecutionException.class, second::get).getCause()); + } + + @Test + void testCloseCompletesQueuedAndInFlightBatchesExceptionally() throws Exception { + final CapturingExecutor executor = new CapturingExecutor(); + final AtomicInteger readIndexCount = new AtomicInteger(); + final CompletableFuture readIndexFuture = new CompletableFuture<>(); + final ReadIndexBatching batching = new ReadIndexBatching( + executor, 1, request -> { + readIndexCount.incrementAndGet(); + return readIndexFuture; + }); + + final CompletableFuture inFlight = batching.submit(null); + final CompletableFuture queued = batching.submit(null); + executor.runNext(); + + Assertions.assertEquals(1, readIndexCount.get()); + Assertions.assertFalse(inFlight.isDone()); + Assertions.assertFalse(queued.isDone()); + + batching.close(); + + assertReadIndexException(inFlight); + assertReadIndexException(queued); + + readIndexFuture.complete(ReadIndexReplyProto.getDefaultInstance()); + assertReadIndexException(inFlight); + } + + @Test + void testScheduleFailureClosesBatching() throws Exception { + final ReadIndexBatching batching = new ReadIndexBatching( + command -> { + throw new RejectedExecutionException("closed"); + }, 64, request -> CompletableFuture.completedFuture(ReadIndexReplyProto.getDefaultInstance())); + + final CompletableFuture rejected = batching.submit(null); + assertReadIndexException(rejected); + + final CompletableFuture afterClose = batching.submit(null); + assertReadIndexException(afterClose); + } + + @Test + void testSubmitAfterCloseCompletesExceptionally() { + final AtomicInteger readIndexCount = new AtomicInteger(); + final ReadIndexBatching batching = new ReadIndexBatching( + Runnable::run, 64, request -> { + readIndexCount.incrementAndGet(); + return new CompletableFuture(); + }); + + batching.close(); + + final CompletableFuture reply = batching.submit(null); + final ExecutionException e = Assertions.assertThrows(ExecutionException.class, reply::get); + Assertions.assertTrue(e.getCause() instanceof ReadIndexException); + Assertions.assertEquals(0, readIndexCount.get()); + } + + private static void assertReadIndexException(CompletableFuture future) throws Exception { + final ExecutionException e = Assertions.assertThrows(ExecutionException.class, future::get); + Assertions.assertTrue(e.getCause() instanceof ReadIndexException); + } + + private static class CapturingExecutor implements Executor { + private final List tasks = new ArrayList<>(); + + public int getTaskCount() { + return tasks.size(); + } + + @Override + public void execute(Runnable command) { + tasks.add(command); + } + + void runNext() { + tasks.remove(0).run(); + } + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc.java new file mode 100644 index 0000000000..663dec30bb --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchAppliedIndexLeaderLeaseWithGrpc + extends TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexWithGrpc.java new file mode 100644 index 0000000000..8c2fe1c8f0 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchAppliedIndexWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchAppliedIndexWithGrpc + extends TestLinearizableReadAppliedIndexWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchLeaderLeaseWithGrpc.java new file mode 100644 index 0000000000..5b90839cd5 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchLeaderLeaseWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchLeaderLeaseWithGrpc + extends TestLinearizableLeaderLeaseReadWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchWithGrpc.java new file mode 100644 index 0000000000..a19ac2fb3b --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadIndexBatchWithGrpc.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +public class TestLinearizableReadIndexBatchWithGrpc + extends TestLinearizableReadWithGrpc { + @Override + public boolean readIndexBatchEnabled() { + return true; + } +}