Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7bb4c17
RATIS-1925. Support Zero-Copy in GrpcClientProtocolService (#1007)
duongkame Jan 13, 2024
404a9d5
RATIS-1990. Refactor appendEntries processing to support reference co…
duongkame Jan 18, 2024
635f1bc
RATIS-2009. ReferenceCount should work for all LogEntry types. (#1021)
duongkame Jan 19, 2024
298d496
RATIS-1934. Support Zero-Copy in GrpcServerProtocolService (#1014)
duongkame Jan 19, 2024
41f3aa7
RATIS-2007. Zero-copy buffers are not released (#1027)
duongkame Jan 25, 2024
5be8a36
RATIS-2018. Zero-copy buffers are not released - 2nd chunk (#1032)
duongkame Jan 30, 2024
1a463bc
RATIS-1978. Add tests assertions to verify all zero-copy messages are…
duongkame Jan 31, 2024
41524bc
RATIS-1997. Refactor StateMachine interface to use ReferenceCountedOb…
duongkame Feb 1, 2024
5f00b74
RATIS-2020. Refactor TransactionContext to supply LogEntryProto via a…
duongkame Feb 21, 2024
6e8a24d
RATIS-2028. Refactor RaftLog to supply log as ReferenceCountedObject …
duongkame Mar 12, 2024
8756c32
RATIS-2026. LogAppender to consume log entries with reference count (…
duongkame Mar 27, 2024
98c1be0
RATIS-1979. Allow StateMachine.read to return a ReferentCountedObject…
duongkame Apr 2, 2024
3c3702a
RATIS-2059. Missing reference count when putting log entries to cache…
duongkame Apr 17, 2024
5ea8b33
RATIS-2077. Timedout StateMachine retainRead is released twice (#1081)
duongkame May 11, 2024
49bbfad
RATIS-2092. Add metrics to expose number of zero-copy unclosed messag…
duongkame May 23, 2024
44eff11
RATIS-2096. Add a conf to enable/disable zero copy. (#1099)
duongkame May 24, 2024
866fd77
RATIS-2093. Decouple metadata and configuration entries from appendEn…
duongkame May 24, 2024
6d1ed51
RATIS-2094. Avoid corruptions from TransactionContext's stateMachineL…
duongkame Jun 3, 2024
c3e9cf1
RATIS-2114. Corruption due to SegmentedRaftLogWorker queue LogEntry w…
duongkame Jun 21, 2024
bfb4b53
RATIS-2151. TestRaftWithGrpc keeps failing with zero-copy. (#1164)
szetszwo Oct 11, 2024
6a9ad67
RATIS-2164. LeakDetector has a race condition. (#1163)
szetszwo Oct 16, 2024
bc16c96
RATIS-2173. Fix zero-copy bugs for non-gRPC cases. (#1167)
szetszwo Oct 20, 2024
2eb5a62
RATIS-2220. Skip further tests after leak detected (#1193)
adoroszlai Dec 23, 2024
5981d17
RATIS-2225. RaftClientRequest leak in RaftServerImpl. (#1198)
szetszwo Dec 23, 2024
b159061
RATIS-2227. LogEntryProto leak in SegmentedRaftLog (#1199)
jianghuazhu Dec 25, 2024
9e5d104
RATIS-2414. Add leak detection for ZeroCopyMessageMarshaller. (#1355)
slfan1989 Feb 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ratis-common/dev-support/findbugsExcludeFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,8 @@
<Class name="org.apache.ratis.util.TimeDuration$Abbreviation" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
</FindBugsFilter>
<Match>
<Class name="org.apache.ratis.util.ReferenceCountedLeakDetector$AdvancedTracing$Counts" />
<Bug pattern="CT_CONSTRUCTOR_THROW" />
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,30 @@
*/
package org.apache.ratis.protocol;

import org.apache.ratis.util.ReferenceCountedObject;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** Asynchronous version of {@link RaftClientProtocol}. */
public interface RaftClientAsynchronousProtocol {
CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException;
/**
* A plain request is submitted from a client for processing.
*
* This default keeps older call sites compatible with implementations that operate on reference-counted requests.
*/
default CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) throws IOException {
return submitClientRequestAsync(ReferenceCountedObject.wrap(request));
}

/**
* A referenced counted request is submitted from a client for processing.
* Implementations of this method should retain the request, process it and then release it.
* The request may be retained even after the future returned by this method has completed.
*
* @return a future of the reply
* @see ReferenceCountedObject
*/
CompletableFuture<RaftClientReply> submitClientRequestAsync(
ReferenceCountedObject<RaftClientRequest> requestRef) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,13 @@ public SpanContextProto getSpanContext() {

@Override
public String toString() {
return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
+ type + ", " + getMessage();
return toStringShort() + ", " + getMessage();
}

/**
* @return a short string which does not include {@link #message}.
*/
public String toStringShort() {
return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", " + type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;

/**
Expand All @@ -46,6 +47,8 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

private boolean closed = false;

public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction<E> getNumBytes) {
super(name, byteLimit, elementLimit, getNumBytes);
}
Expand All @@ -72,10 +75,34 @@ public void clear() {
}
}

/** Apply the given handler to each element and then {@link #clear()}. */
public void clear(Consumer<E> handler) {
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(E e : this) {
handler.accept(e);
}
super.clear();
}
}

/**
* Close this queue to stop accepting new elements, i.e. the offer(…) methods always return false.
* Note that closing the queue will not clear the existing elements.
* The existing elements can be peeked, polled or cleared after close.
*/
public void close() {
try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) {
closed = true;
}
}

@Override
public boolean offer(E element) {
Objects.requireNonNull(element, "element == null");
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
if (closed) {
return false;
}
if (super.offer(element)) {
notEmpty.signal();
return true;
Expand All @@ -95,6 +122,9 @@ public boolean offer(E element, TimeDuration timeout) throws InterruptedExceptio
long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(;;) {
if (closed) {
return false;
}
if (super.offer(element)) {
notEmpty.signal();
return true;
Expand Down
201 changes: 201 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
* observe resource object life-cycle and assert proper resource closure before they are GCed.
*
* <p>
* Example usage:
*
* <pre> {@code
* class MyResource implements AutoClosable {
* static final LeakDetector LEAK_DETECTOR = new LeakDetector("MyResource");
*
* private final UncheckedAutoCloseable leakTracker = LEAK_DETECTOR.track(this, () -> {
* // report leaks, don't refer to the original object (MyResource) here.
* System.out.println("MyResource is not closed before being discarded.");
* });
*
* @Override
* public void close() {
* // proper resources cleanup...
* // inform tracker that this object is closed properly.
* leakTracker.close();
* }
* }
*
* }</pre>
*/
public class LeakDetector {
private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);

private static class LeakTrackerSet {
private final Set<LeakTracker> set = Collections.newSetFromMap(new HashMap<>());

synchronized boolean remove(LeakTracker tracker) {
return set.remove(tracker);
}

synchronized void removeExisting(LeakTracker tracker) {
final boolean removed = set.remove(tracker);
Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker);
}

synchronized LeakTracker add(Object referent, ReferenceQueue<Object> queue, Supplier<String> leakReporter) {
final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter);
final boolean added = set.add(tracker);
Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent);
return tracker;
}

synchronized int getNumLeaks(boolean throwException) {
if (set.isEmpty()) {
return 0;
}

int n = 0;
for (LeakTracker tracker : set) {
if (tracker.reportLeak() != null) {
n++;
}
}
if (throwException) {
assertNoLeaks(n);
}
return n;
}

synchronized void assertNoLeaks(int leaks) {
Preconditions.assertTrue(leaks == 0, () -> {
final int size = set.size();
return "#leaks = " + leaks + " > 0, #leaks " + (leaks == size? "==" : "!=") + " set.size = " + size;
});
}
}

private static final AtomicLong COUNTER = new AtomicLong();

private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
/** All the {@link LeakTracker}s. */
private final LeakTrackerSet trackers = new LeakTrackerSet();
/** When a leak is discovered, a message is printed and added to this list. */
private final List<String> leakMessages = Collections.synchronizedList(new ArrayList<>());
private final String name;

LeakDetector(String name) {
this.name = name + COUNTER.getAndIncrement();
}

LeakDetector start() {
Thread t = new Thread(this::run);
t.setName(LeakDetector.class.getSimpleName() + "-" + name);
t.setDaemon(true);
LOG.info("Starting leak detector thread {}.", name);
t.start();
return this;
}

private void run() {
while (true) {
try {
LeakTracker tracker = (LeakTracker) queue.remove();
// Original resource already been GCed, if tracker is not closed yet,
// report a leak.
if (trackers.remove(tracker)) {
final String leak = tracker.reportLeak();
if (leak != null) {
leakMessages.add(leak);
}
}
} catch (InterruptedException e) {
LOG.warn("Thread interrupted, exiting.", e);
break;
}
}

LOG.warn("Exiting leak detector {}.", name);
}

Runnable track(Object leakable, Supplier<String> reportLeak) {
// TODO: A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
// if we have proofs that leak tracking impacts performance, or a single LeakDetector
// thread can't keep up with the pace of object allocation.
// For now, it looks effective enough and let keep it simple.
return trackers.add(leakable, queue, reportLeak)::remove;
}

public int getLeakCount() {
return trackers.getNumLeaks(false);
}

public void assertNoLeaks(int maxRetries, TimeDuration retrySleep) throws InterruptedException {
synchronized (leakMessages) {
// leakMessages are all the leaks discovered so far.
Preconditions.assertTrue(leakMessages.isEmpty(),
() -> "#leaks = " + leakMessages.size() + "\n" + leakMessages);
}

for(int i = 0; i < maxRetries; i++) {
final int numLeaks = trackers.getNumLeaks(false);
if (numLeaks == 0) {
return;
}
LOG.warn("{}/{}) numLeaks == {} > 0, will wait and retry ...", i, maxRetries, numLeaks);
retrySleep.sleep();
}
trackers.getNumLeaks(true);
}

private static final class LeakTracker extends WeakReference<Object> {
private final Consumer<LeakTracker> removeMethod;
private final Supplier<String> getLeakMessage;

LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
Consumer<LeakTracker> removeMethod, Supplier<String> getLeakMessage) {
super(referent, referenceQueue);
this.removeMethod = removeMethod;
this.getLeakMessage = getLeakMessage;
}

/** Called by the tracked resource when the object is completely released. */
void remove() {
removeMethod.accept(this);
}

/** @return the leak message if there is a leak; return null if there is no leak. */
String reportLeak() {
return getLeakMessage.get();
}
}
}
Loading
Loading