diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java index 3c3be83e79..ea0c3a3428 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java @@ -33,6 +33,8 @@ public final class RatisAttributes { public static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("raft.operation.name"); public static final AttributeKey OPERATION_TYPE = AttributeKey.stringKey("raft.operation.type"); + /** Number of log entries in a single {@code AppendEntries} RPC (0 for heartbeat). */ + public static final AttributeKey APPEND_ENTRIES_COUNT = AttributeKey.longKey("raft.append.entries.count"); private RatisAttributes() { } diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/SpanNames.java b/ratis-common/src/main/java/org/apache/ratis/trace/SpanNames.java new file mode 100644 index 0000000000..4fab8d67e6 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/SpanNames.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace; + +public final class SpanNames { + private static final String SEPARATOR = "."; + + public static final String ASYNC_SEND = "Async::send"; + + public static final String RAFT_SERVER_PREFIX = "raft" + SEPARATOR + "server"; + public static final String SUBMIT_CLIENT_REQUEST_ASYNC = RAFT_SERVER_PREFIX + SEPARATOR + "submitClientRequestAsync"; + public static final String APPEND_ENTRIES_ASYNC = RAFT_SERVER_PREFIX + SEPARATOR + "appendEntriesAsync"; + + private SpanNames() { + } +} + diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java index 0ab34e689e..e7145eb947 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java @@ -43,7 +43,7 @@ public static CompletableFuture asyncSend( return action.get(); } return TraceUtils.traceAsyncMethod(action, - () -> createClientOperationSpan(type, server, "Async::send")); + () -> createClientOperationSpan(type, server, SpanNames.ASYNC_SEND)); } private static Span createClientOperationSpan(RaftClientRequest.Type type, RaftPeerId server, diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java index 9670f0d763..35f8fef1a5 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java @@ -20,9 +20,14 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.function.CheckedSupplier; +import java.io.IOException; import java.util.concurrent.CompletableFuture; /** Server-side OpenTelemetry helpers. */ @@ -56,4 +61,34 @@ private static Span createServerSpanFromClientRequest(RaftClientRequest request, span.setAttribute(RatisAttributes.MEMBER_ID, memberId); return span; } + + /** + * Traces follower handling of {@link AppendEntriesRequestProto} when the leader attached trace + * context (client-originated) for replication. + */ + public static CompletableFuture traceAppendEntriesAsync( + CheckedSupplier, IOException> action, + AppendEntriesRequestProto request, String memberId) throws IOException { + if (!TraceUtils.isEnabled()) { + return action.get(); + } + final RaftRpcRequestProto rpc = request.getServerRequest(); + final SpanContextProto spanContext = rpc.getSpanContext(); + // If the leader sent no parent span context, still trace as a root span + // rather than skipping tracing entirely. + final Context remoteContext = (spanContext == null || spanContext.getContextMap().isEmpty()) + ? Context.root() + : TraceUtils.extractContextFromProto(spanContext); + return TraceUtils.traceAsyncMethod(action, () -> { + final Span span = TraceUtils.getGlobalTracer() + .spanBuilder(SpanNames.APPEND_ENTRIES_ASYNC) + .setParent(remoteContext) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + span.setAttribute(RatisAttributes.MEMBER_ID, memberId); + span.setAttribute(RatisAttributes.PEER_ID, String.valueOf(RaftPeerId.valueOf(rpc.getRequestorId()))); + span.setAttribute(RatisAttributes.APPEND_ENTRIES_COUNT, (long) request.getEntriesCount()); + return span; + }); + } } diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java index f350ca8884..3dc3e228f1 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java @@ -81,7 +81,7 @@ public static void setTracerWhenEnabled(boolean enabled) { } } - static boolean isEnabled() { + public static boolean isEnabled() { return TRACER.get() != null; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 1c986ca638..5e5cdaee25 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -26,6 +26,7 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; @@ -353,6 +354,7 @@ boolean isApplied() { private final LogAppenderMetrics logAppenderMetrics; private final long followerMaxGapThreshold; private final PendingStepDown pendingStepDown; + private final LeaderTracer leaderTracer; private final ReadIndexHeartbeats readIndexHeartbeats; private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType; @@ -387,6 +389,7 @@ boolean isApplied() { this.logMetadataEnabled = RaftServerConfigKeys.Log.logMetadataEnabled(properties); long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties); double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties); + leaderTracer = new LeaderTracer(); if (followerGapRatioMax == -1) { this.followerMaxGapThreshold = -1; @@ -483,6 +486,7 @@ CompletableFuture stop() { raftServerMetrics.unregister(); pendingRequests.close(); watchRequests.close(); + leaderTracer.close(); return f; } @@ -558,7 +562,9 @@ PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientReques LOG.debug("{}: addPendingRequest at {}, entry={}", this, request, LogProtoUtils.toLogEntryString(entry.getLogEntry())); } - return pendingRequests.add(permit, request, entry); + final PendingRequest pending = pendingRequests.add(permit, request, entry); + leaderTracer.tracePendingRequest(pending); + return pending; } CompletableFuture streamAsync(RaftClientRequest request) { @@ -645,9 +651,10 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo List entries, TermIndex previous, long callId) { final boolean initializing = !isCaughtUp(follower); final RaftPeerId targetId = follower.getId(); + final SpanContextProto tracing = leaderTracer.traceAppendEntries(entries); return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, getCurrentTerm(), entries, ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(), previous, entries.size()), - initializing, previous, server.getCommitInfos(), callId); + initializing, previous, server.getCommitInfos(), callId, tracing); } /** @@ -1243,6 +1250,7 @@ private boolean checkLeaderLease() { void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheImpl.CacheEntry cacheEntry) { final PendingRequest pending = pendingRequests.remove(termIndex); + leaderTracer.removePendingRequest(pending); final LongSupplier replyMethod = () -> { cacheEntry.updateResult(reply); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderTracer.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderTracer.java new file mode 100644 index 0000000000..0f4a43e055 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderTracer.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; +import org.apache.ratis.trace.TraceUtils; +import org.apache.ratis.util.AutoCloseableLock; + +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +class LeaderTracer { + private final AppendEntriesSpans appendEntriesSpans; + public LeaderTracer() { + appendEntriesSpans = new AppendEntriesSpans(); + } + /** + * Client-originated trace context keyed by log index and propagated in appendEntries requests, + * so follower appendEntries spans can join the same trace as the client write. + */ + static class AppendEntriesSpans { + private final NavigableMap sorted = new TreeMap<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + SpanContextProto get(long first, long last) { + try (AutoCloseableLock ignored = AutoCloseableLock.acquire(lock.readLock())) { + for (SpanContextProto sc : sorted.subMap(first, true, last, true).values()) { + if (sc != null && !sc.getContextMap().isEmpty()) { + return sc; + } + } + } + return null; + } + + void put(long index, SpanContextProto spanContext) { + try (AutoCloseableLock ignored = AutoCloseableLock.acquire(lock.writeLock())) { + sorted.put(index, spanContext); + } + } + + void remove(long index) { + try (AutoCloseableLock ignored = AutoCloseableLock.acquire(lock.writeLock())) { + sorted.remove(index); + } + } + + void clear() { + try (AutoCloseableLock ignored = AutoCloseableLock.acquire(lock.writeLock())) { + sorted.clear(); + } + } + } + + void tracePendingRequest(PendingRequest pending) { + if (pending == null || !TraceUtils.isEnabled()) { + return; + } + final SpanContextProto spanContext = pending.getRequest().getSpanContext(); + if (spanContext != null && !spanContext.getContextMap().isEmpty()) { + appendEntriesSpans.put(pending.getTermIndex().getIndex(), spanContext); + } + } + + void removePendingRequest(PendingRequest pending) { + appendEntriesSpans.remove(pending.getTermIndex().getIndex()); + } + + SpanContextProto traceAppendEntries(List entries) { + if (entries == null || entries.isEmpty()) { + return null; + } + final long first = entries.get(0).getIndex(); + final long last = entries.get(entries.size() - 1).getIndex(); + return appendEntriesSpans.get(first, last); + } + + void close() { + appendEntriesSpans.clear(); + } +} \ No newline at end of file diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c0e93338a6..7ad962d729 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -101,6 +101,7 @@ import org.apache.ratis.statemachine.impl.TransactionContextImpl; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.ratis.trace.SpanNames; import org.apache.ratis.trace.TraceServer; import org.apache.ratis.trace.TraceUtils; import org.apache.ratis.util.CodeInjectionForTesting; @@ -982,7 +983,7 @@ public CompletableFuture submitClientRequestAsync( RaftClientRequest request) throws IOException { return TraceServer.traceAsyncMethod( () -> submitClientRequestAsyncInternal(request), - request, getMemberId().toString(), "raft.server.submitClientRequestAsync"); + request, getMemberId().toString(), SpanNames.SUBMIT_CLIENT_REQUEST_ASYNC); } private CompletableFuture submitClientRequestAsyncInternal( @@ -1553,7 +1554,6 @@ public CompletableFuture appendEntriesAsync(AppendEntri try { final RaftPeerId leaderId = RaftPeerId.valueOf(request.getRequestorId()); final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(request.getRaftGroupId()); - CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), leaderId, previous, r); assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); @@ -1562,8 +1562,8 @@ public CompletableFuture appendEntriesAsync(AppendEntri } assertGroup(getMemberId(), leaderId, leaderGroupId); assertEntries(r, previous, state); - - return appendEntriesAsync(leaderId, request.getCallId(), previous, r); + return TraceServer.traceAppendEntriesAsync(() -> appendEntriesAsync(leaderId, request.getCallId(), previous, r), + r, getMemberId().toString()); } catch(Exception t) { LOG.error("{}: Failed appendEntries* {}", getMemberId(), toAppendEntriesRequestString(r, stateMachine::toStateMachineLogEntryString), t); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 19d4ce6a75..bbe438c0c0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -157,9 +157,13 @@ static AppendEntriesReplyProto toAppendEntriesReplyProto( static AppendEntriesRequestProto toAppendEntriesRequestProto( RaftGroupMemberId requestorId, RaftPeerId replyId, long leaderTerm, List entries, long leaderCommit, boolean initializing, - TermIndex previous, Collection commitInfos, long callId) { + TermIndex previous, Collection commitInfos, long callId, + SpanContextProto tracingContext) { final RaftRpcRequestProto.Builder rpcRequest = ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId) .setCallId(callId); + if (tracingContext != null && !tracingContext.getContextMap().isEmpty()) { + rpcRequest.setSpanContext(tracingContext); + } final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto .newBuilder() .setServerRequest(rpcRequest) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java index 300cf51cde..81e9289f42 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java @@ -19,10 +19,16 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; import io.opentelemetry.sdk.trace.data.SpanData; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroup; @@ -32,13 +38,21 @@ import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; +import org.apache.ratis.trace.RatisAttributes; +import org.apache.ratis.trace.SpanNames; import org.apache.ratis.trace.TraceConfigKeys; +import org.apache.ratis.trace.TraceServer; import org.apache.ratis.trace.TraceUtils; +import org.apache.ratis.util.JavaUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -61,7 +75,7 @@ public void testSubmitClientRequestAsync() throws Exception { ); assertTrue( spans.stream().anyMatch(s -> s.getKind() == SpanKind.SERVER - && s.getName().equals("raft.server.submitClientRequestAsync")), + && s.getName().equals(SpanNames.SUBMIT_CLIENT_REQUEST_ASYNC)), "Expected at least one span with SpanKind.SERVER" ); @@ -75,7 +89,7 @@ public void testSubmitClientRequestAsyncTracingDisabled() throws Exception { assertEquals(1, spans.size()); assertTrue( spans.stream().noneMatch(s -> s.getKind() == SpanKind.SERVER - && s.getName().equals("raft.server.submitClientRequestAsync")), + && s.getName().equals(SpanNames.SUBMIT_CLIENT_REQUEST_ASYNC)), "Expected no SERVER span when tracing is disabled" ); assertTrue( @@ -84,6 +98,114 @@ public void testSubmitClientRequestAsyncTracingDisabled() throws Exception { ); } + @Test + public void testTraceAppendEntriesAsyncCreatesInternalSpan() throws Exception { + long callId = randomCallId(); + int entriesCount = 3; + final List spans = traceAppendEntriesAndCollectNewSpans(true, newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), callId, entriesCount, injectedSpanContext())); + final SpanData appendSpan = spans.stream() + .filter(s -> s.getKind() == SpanKind.INTERNAL && s.getName().equals(SpanNames.APPEND_ENTRIES_ASYNC)) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Expected INTERNAL span " + SpanNames.APPEND_ENTRIES_ASYNC)); + assertEquals("n1", appendSpan.getAttributes().get(RatisAttributes.MEMBER_ID)); + assertEquals("leader1", appendSpan.getAttributes().get(RatisAttributes.PEER_ID)); + assertEquals(entriesCount, appendSpan.getAttributes().get(RatisAttributes.APPEND_ENTRIES_COUNT)); + } + + @Test + public void testTraceAppendEntriesAsyncTracingDisabled() throws Exception { + int entriesCount = 1; + final List spans = traceAppendEntriesAndCollectNewSpans(false, newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, injectedSpanContext())); + assertTrue( + spans.stream().noneMatch(s -> s.getName().equals(SpanNames.APPEND_ENTRIES_ASYNC)), + "Expected no appendEntries span when tracing disabled, got: " + spans); + } + + @Test + public void testTraceAppendEntriesAsyncSkipsWhenSpanContextEmpty() throws Exception { + int entriesCount = 1; + final AppendEntriesRequestProto request = newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, SpanContextProto.getDefaultInstance()); + final List spans = traceAppendEntriesAndCollectNewSpans(true, request); + assertEquals(1, + spans.stream().filter(s -> s.getName().equals(SpanNames.APPEND_ENTRIES_ASYNC)).count()); + } + + @Test + public void testTraceAppendEntriesAsyncSpanRecordsErrorOnFailure() throws Exception { + int entriesCount = 0; + final List spans = traceAppendEntriesAndCollectNewSpans(true, newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, injectedSpanContext()), + () -> JavaUtils.completeExceptionally(new IOException("Planned record error"))); + assertEquals(1, + spans.stream().filter(s -> s.getName().equals(SpanNames.APPEND_ENTRIES_ASYNC)).count()); + final SpanData appendSpan = spans.stream() + .filter(s -> s.getKind() == SpanKind.INTERNAL && s.getName().equals(SpanNames.APPEND_ENTRIES_ASYNC)) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Expected INTERNAL span " + SpanNames.APPEND_ENTRIES_ASYNC)); + assertEquals(StatusCode.ERROR, appendSpan.getStatus().getStatusCode()); + } + + private static List traceAppendEntriesAndCollectNewSpans( + boolean enableTracing, AppendEntriesRequestProto request) throws Exception { + return traceAppendEntriesAndCollectNewSpans(enableTracing, request, + () -> CompletableFuture.completedFuture(AppendEntriesReplyProto.getDefaultInstance())); + } + + private static List traceAppendEntriesAndCollectNewSpans( + boolean enableTracing, + AppendEntriesRequestProto request, + Supplier> action) + throws Exception { + final int before = openTelemetryExtension.getSpans().size(); + try { + TraceUtils.setTracerWhenEnabled(enableTracing); + final CompletableFuture traced = + TraceServer.traceAppendEntriesAsync(action::get, request, "n1"); + try { + traced.join(); + } catch (CompletionException e) { + // allowed for failure-path test + } + } finally { + TraceUtils.setTracerWhenEnabled(false); + } + final List after = openTelemetryExtension.getSpans(); + return new ArrayList<>(after.subList(before, after.size())); + } + + private static SpanContextProto injectedSpanContext() { + final Span remoteParent = openTelemetryExtension.getOpenTelemetry().getTracer("test") + .spanBuilder("remote-parent") + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + try { + return TraceUtils.injectContextToProto(Context.current().with(remoteParent)); + } finally { + remoteParent.end(); + } + } + + private static AppendEntriesRequestProto newAppendEntriesRequest( + RaftPeerId leaderId, long callId, int entriesCount, SpanContextProto spanContext) { + final RaftRpcRequestProto.Builder rpc = RaftRpcRequestProto.newBuilder() + .setRequestorId(leaderId.toByteString()) + .setCallId(callId); + if (spanContext != null && !spanContext.getContextMap().isEmpty()) { + rpc.setSpanContext(spanContext); + } + final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto.newBuilder() + .setServerRequest(rpc.build()) + .setLeaderTerm(1L) + .setLeaderCommit(0L); + for (int i = 0; i < entriesCount; i++) { + b.addEntries(LogEntryProto.newBuilder().setTerm(1L).setIndex(i + 1L).build()); + } + return b.build(); + } + private static List submitClientRequestAndCollectNewSpans(boolean enableTracing) throws Exception { final int before = openTelemetryExtension.getSpans().size(); @@ -137,5 +259,10 @@ private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type typ clientSpan.end(); } } + + private long randomCallId() { + return (long) (Math.random() * 100); + } + }