From 34e691036454cdcf66b09a68655a4620e3474531 Mon Sep 17 00:00:00 2001 From: Mike Skells Date: Wed, 29 Apr 2026 23:11:00 +0100 Subject: [PATCH 1/5] AVRO-4249 provide a cache of schema to avoid building --- .../org/apache/avro/file/DataFileStream.java | 10 ++++-- .../org/apache/avro/file/SchemaCache.java | 33 +++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java index e2e79d8eaed..4f0e342804c 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java @@ -118,6 +118,12 @@ void validateMagic(byte[] magic) throws InvalidAvroMagicException { /** Initialize the stream by reading from its head. */ void initialize(InputStream in, byte[] magic) throws IOException { + initialize(in, magic, SchemaCache.NO_CACHE); + } + + + /** Initialize the stream by reading from its head. */ + protected void initialize(InputStream in, byte[] magic, SchemaCache schemaCache) throws IOException { this.header = new Header(); this.vin = DecoderFactory.get().binaryDecoder(in, vin); magic = (magic == null) ? readMagic() : magic; @@ -140,8 +146,8 @@ void initialize(InputStream in, byte[] magic) throws IOException { // finalize the header header.metaKeyList = Collections.unmodifiableList(header.metaKeyList); - header.schema = new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false) - .parse(getMetaString(DataFileConstants.SCHEMA)); + header.schema = schemaCache.getOrParseSchema(getMetaString(DataFileConstants.SCHEMA)); + this.codec = resolveCodec(); reader.setSchema(header.schema); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java new file mode 100644 index 00000000000..338e9a34cf7 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java @@ -0,0 +1,33 @@ +package org.apache.avro.file; + +import org.apache.avro.NameValidator; +import org.apache.avro.Schema; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public abstract class SchemaCache { + public abstract Schema getOrParseSchema(String metaString); + + protected Schema parse(String metaString) { + return new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false) + .parse(metaString); + } + + public static final SchemaCache NO_CACHE = new SchemaCache() { + @Override + public Schema getOrParseSchema(String metaString) { + return parse(metaString); + } + }; + public static SchemaCache createConcurrentCache() { + return new SchemaCache() { + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + + @Override + public Schema getOrParseSchema(String metaString) { + return cache.computeIfAbsent(metaString, this::parse); + } + }; + } + +} From 9f5b962f41f14030c4ba1e4a5821b1e757b3f5bd Mon Sep 17 00:00:00 2001 From: mkeskells Date: Sat, 2 May 2026 22:48:07 +0100 Subject: [PATCH 2/5] AVRO-4249 provide a cache of schema to avoid building add tests and a benchmark --- .../org/apache/avro/file/DataFileStream.java | 20 ++- .../org/apache/avro/file/SchemaCache.java | 115 ++++++++++++++++- .../org/apache/avro/file/TestSchemaCache.java | 90 +++++++++++++ .../avro/perf/SchemaCacheEffectTest.java | 119 ++++++++++++++++++ .../org/apache/avro/perf/SchemaCacheTest.java | 90 +++++++++++++ .../avro/perf/test/basic/SchemaCacheTest.java | 78 ++++++++++++ 6 files changed, 503 insertions(+), 9 deletions(-) create mode 100644 lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java create mode 100644 lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java create mode 100644 lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheTest.java create mode 100644 lang/java/perf/src/main/java/org/apache/avro/perf/test/basic/SchemaCacheTest.java diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java index 4f0e342804c..e7d987b3d33 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java @@ -19,12 +19,12 @@ import org.apache.avro.AvroRuntimeException; import org.apache.avro.InvalidAvroMagicException; -import org.apache.avro.NameValidator; import org.apache.avro.Schema; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.file.SchemaCache.WeakSchemaCache; import java.io.Closeable; import java.io.EOFException; @@ -49,6 +49,21 @@ */ public class DataFileStream implements Iterator, Iterable, Closeable { + private final static SchemaCache SCHEMA_CACHE; + static { + String cacheType = System.getProperty("avro.schema.cache", "none").toLowerCase(); + switch (cacheType) { + case "none": + SCHEMA_CACHE = SchemaCache.NO_CACHE; + break; + case "weak": + SCHEMA_CACHE = WeakSchemaCache.INSTANCE; + break; + default: + throw new IllegalArgumentException("Unknown schema cache type: " + cacheType); + } + } + /** * A handle that can be used to reopen a DataFile without re-reading the header * of the stream. @@ -118,10 +133,9 @@ void validateMagic(byte[] magic) throws InvalidAvroMagicException { /** Initialize the stream by reading from its head. */ void initialize(InputStream in, byte[] magic) throws IOException { - initialize(in, magic, SchemaCache.NO_CACHE); + initialize(in, magic, SCHEMA_CACHE); } - /** Initialize the stream by reading from its head. */ protected void initialize(InputStream in, byte[] magic, SchemaCache schemaCache) throws IOException { this.header = new Header(); diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java index 338e9a34cf7..c1da2406e79 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java @@ -2,27 +2,64 @@ import org.apache.avro.NameValidator; import org.apache.avro.Schema; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +/** + * Abstract base class for caching parsed Avro schemas. Provides different + * caching strategies including no cache, concurrent cache, and weak cache. + */ public abstract class SchemaCache { - public abstract Schema getOrParseSchema(String metaString); - - protected Schema parse(String metaString) { - return new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false) - .parse(metaString); - } + /** + * A cache implementation that does not cache schemas, always parsing them. + */ public static final SchemaCache NO_CACHE = new SchemaCache() { @Override public Schema getOrParseSchema(String metaString) { return parse(metaString); } }; + + /** + * Gets the schema for the given meta string, parsing it if not known. + * + * @param metaString the schema string to parse or retrieve from cache + * @return the parsed Schema + */ + public abstract Schema getOrParseSchema(String metaString); + + /** return the number of cached entries */ + public int size() { + return 0; + } + + /** + * Parses the schema string into a Schema object. + * + * @param metaString the schema string + * @return the parsed Schema + */ + protected Schema parse(String metaString) { + return new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false).parse(metaString); + } + + /** + * Creates a concurrent cache that strongly references schemas. + * + * @return a SchemaCache with concurrent strong caching + */ public static SchemaCache createConcurrentCache() { return new SchemaCache() { private final ConcurrentMap cache = new ConcurrentHashMap<>(); + @Override + public int size() { + return cache.size(); + } + @Override public Schema getOrParseSchema(String metaString) { return cache.computeIfAbsent(metaString, this::parse); @@ -30,4 +67,70 @@ public Schema getOrParseSchema(String metaString) { }; } + /** + * A cache implementation that uses weak references for schema values, allowing + * them to be garbage collected when not strongly referenced. + */ + public static class WeakSchemaCache extends SchemaCache { + /** + * A weak cache implementation that allows schema values to be garbage + * collected. + */ + public static final SchemaCache INSTANCE = createWeakCache(); + + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + private final ReferenceQueue queue = new ReferenceQueue<>(); + + @Override + public Schema getOrParseSchema(String metaString) { + trim(); + + return cache.compute(metaString, (k, ref) -> { + if (ref != null) { + Schema schema = ref.get(); + if (schema != null) { + return ref; + } + } + // Absent or cleared, parse new + Schema schema = parse(metaString); + return new WeakValueRef(k, schema, queue); + }).get(); + } + + @Override + public int size() { + trim(); + return cache.size(); + } + + /** + * Cleans up entries with cleared weak references from the cache. + */ + void trim() { + // Clean up cleared references + WeakValueRef clearedRef; + while ((clearedRef = (WeakValueRef) queue.poll()) != null) { + cache.remove(clearedRef.key, clearedRef); + } + } + + private static class WeakValueRef extends WeakReference { + final String key; + + WeakValueRef(String key, Schema referent, ReferenceQueue q) { + super(referent, q); + this.key = key; + } + } + } + + /** + * Creates a weak cache that allows schema values to be garbage collected. + * + * @return a WeakSchemaCache instance + */ + public static WeakSchemaCache createWeakCache() { + return new WeakSchemaCache(); + } } diff --git a/lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java b/lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java new file mode 100644 index 00000000000..dc31e0a56ee --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.file; + +import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class TestSchemaCache { + + private static final String SCHEMA_STRING = "{\"type\": \"record\", \"name\": \"Test\", \"fields\": [{\"name\": \"f\", \"type\": \"string\"}]}"; + + @Test + void testNoCache() { + SchemaCache cache = SchemaCache.NO_CACHE; + Schema schema1 = cache.getOrParseSchema(SCHEMA_STRING); + Schema schema2 = cache.getOrParseSchema(SCHEMA_STRING); + // Should be different instances since no caching + assertNotSame(schema1, schema2); + assertEquals(schema1, schema2); + } + + @Test + void testConcurrentCache() { + SchemaCache cache = SchemaCache.createConcurrentCache(); + Schema schema1 = cache.getOrParseSchema(SCHEMA_STRING); + Schema schema2 = cache.getOrParseSchema(SCHEMA_STRING); + // Should be same instance due to caching + assertSame(schema1, schema2); + } + + @Test + void testWeakCache() { + SchemaCache.WeakSchemaCache cache = SchemaCache.createWeakCache(); + Schema schema1 = cache.getOrParseSchema(SCHEMA_STRING); + Schema schema2 = cache.getOrParseSchema(SCHEMA_STRING); + // Should be same instance due to caching + assertSame(schema1, schema2); + // Test trim method + cache.trim(); // Should not throw + } + + @Test + void testWeakCacheAllowsGc() throws InterruptedException { + SchemaCache.WeakSchemaCache cache = SchemaCache.createWeakCache(); + // Parse schema + Schema schema = cache.getOrParseSchema(SCHEMA_STRING); + assertEquals(1, cache.size(), "Cache should be 1 after fetch"); + // Hold a weak reference to it + java.lang.ref.WeakReference weakRef = new java.lang.ref.WeakReference<>(schema); + // Clear strong reference + schema = null; + // Force GC + + for (int i = 0; i < 10 & !weakRef.isEnqueued(); i++) { + System.gc(); + System.runFinalization(); + Thread.sleep(100); // Allow GC to run + } + assertNull(weakRef.get(), "Schema should have been GC'd"); + assertEquals(0, cache.size(), "Cache should be empty after GC"); + Schema retrieved = cache.getOrParseSchema(SCHEMA_STRING); + assertNotNull(retrieved); + assertEquals(1, cache.size(), "Cache should be 1 after fetch"); + // Now clear cache reference by trimming if possible, but since it's weak, after + // GC it should be gone + // But in practice, the cache holds the WeakReference, so the Schema is still + // referenced. + // To test GC, we need to not hold the retrieved reference. + // This is hard to test reliably in unit tests. + // For now, just test basic caching. + } +} diff --git a/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java new file mode 100644 index 00000000000..815a616a403 --- /dev/null +++ b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.perf; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(2) +@State(Scope.Benchmark) +public class SchemaCacheEffectTest { + + @Param({ "5", "50", "500" }) + private int numFields; + + @Param({ "none", "weak" }) + private String cacheType; + + @Param({ "1", "10", "100" }) + private int numRecords; + + private String schemaString; + private Schema schema; + private byte[] avroData; + + @Setup(Level.Trial) + public void setup() { + // Create schema string with specified number of fields + StringBuilder sb = new StringBuilder(); + sb.append("{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["); + for (int i = 0; i < numFields; i++) { + if (i > 0) + sb.append(","); + sb.append("{\"name\": \"f").append(i).append("\", \"type\": \"string\"}"); + } + sb.append("]}"); + schemaString = sb.toString(); + schema = new Schema.Parser().parse(schemaString); + + // Set system property for cache type + System.setProperty("avro.schema.cache", cacheType); + + // Create Avro data with specified number of records + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) { + writer.create(schema, baos); + for (int i = 0; i < numRecords; i++) { + GenericRecord record = new GenericData.Record(schema); + for (int j = 0; j < numFields; j++) { + record.put("f" + j, "value" + i + "_" + j); + } + writer.append(record); + } + writer.close(); + avroData = baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Benchmark + @Threads(5) + public void benchmarkDataReading(Blackhole bh) throws IOException { + try (DataFileStream reader = new DataFileStream<>(new ByteArrayInputStream(avroData), + new GenericDatumReader())) { + for (GenericRecord record : reader) { + bh.consume(record); + // Consume the record (do nothing for benchmark) + } + } + } + +} diff --git a/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheTest.java b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheTest.java new file mode 100644 index 00000000000..bd86fbd24d8 --- /dev/null +++ b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.perf; + +import org.apache.avro.Schema; +import org.apache.avro.file.SchemaCache; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(2) +@State(Scope.Benchmark) +public class SchemaCacheTest { + + @Param({ "5", "50", "500" }) + private int numFields; + + @Param({ "NO_CACHE", "CONCURRENT", "WEAK" }) + private String cacheType; + + private SchemaCache cache; + private String schemaString; + + @Setup(Level.Trial) + public void setup() { + // Create schema string with specified number of fields + StringBuilder sb = new StringBuilder(); + sb.append("{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["); + for (int i = 0; i < numFields; i++) { + if (i > 0) + sb.append(","); + sb.append("{\"name\": \"f").append(i).append("\", \"type\": \"string\"}"); + } + sb.append("]}"); + schemaString = sb.toString(); + + // Initialize cache based on type + switch (cacheType) { + case "NO_CACHE": + cache = SchemaCache.NO_CACHE; + break; + case "CONCURRENT": + cache = SchemaCache.createConcurrentCache(); + break; + case "WEAK": + cache = SchemaCache.createWeakCache(); + break; + default: + throw new IllegalArgumentException("Unknown cache type: " + cacheType); + } + + } + + @Benchmark + public Schema benchmarkSchemaParsing() { + return cache.getOrParseSchema(schemaString); + } + +} diff --git a/lang/java/perf/src/main/java/org/apache/avro/perf/test/basic/SchemaCacheTest.java b/lang/java/perf/src/main/java/org/apache/avro/perf/test/basic/SchemaCacheTest.java new file mode 100644 index 00000000000..95ee3dffdf3 --- /dev/null +++ b/lang/java/perf/src/main/java/org/apache/avro/perf/test/basic/SchemaCacheTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.perf.test.basic; + +import org.apache.avro.file.SchemaCache; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +public class SchemaCacheTest { + + @State(Scope.Benchmark) + public static class TestState { + @Param({ "5", "50", "500" }) + int schemaSize; + + @Param({ "no_cache", "concurrent", "weak" }) + String cacheType; + + SchemaCache cache; + String schemaString; + + @Setup(Level.Trial) + public void setup() { + schemaString = generateSchema(schemaSize); + switch (cacheType) { + case "no_cache": + cache = SchemaCache.NO_CACHE; + break; + case "concurrent": + cache = SchemaCache.createConcurrentCache(); + break; + case "weak": + cache = SchemaCache.createWeakCache(); + break; + default: + throw new IllegalArgumentException("Unknown cache type: " + cacheType); + } + } + + private String generateSchema(int numFields) { + + StringBuilder sb = new StringBuilder(); + sb.append("{\"type\": \"record\", \"name\": \"TestRecord\", \"fields\": ["); + for (int i = 0; i < numFields; i++) { + if (i > 0) + sb.append(","); + sb.append("{\"name\": \"field").append(i).append("\", \"type\": \"string\"}"); + } + sb.append("]}"); + return sb.toString(); + } + } + + @Benchmark + public void getOrParseSchema(TestState state) { + state.cache.getOrParseSchema(state.schemaString); + } +} From b14e690669b55e29f6f535af953a9f6b3c921465 Mon Sep 17 00:00:00 2001 From: mkeskells Date: Sun, 3 May 2026 15:46:51 +0100 Subject: [PATCH 3/5] AVRO-4249 add licence --- .../java/org/apache/avro/file/SchemaCache.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java index c1da2406e79..923054578fa 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.avro.file; import org.apache.avro.NameValidator; From 5e6f42716d632849d888c042575d72d04f4c6ea0 Mon Sep 17 00:00:00 2001 From: mkeskells Date: Thu, 7 May 2026 08:45:45 +0100 Subject: [PATCH 4/5] AVRO-4249 fix checkstyle noise --- .../main/java/org/apache/avro/perf/SchemaCacheEffectTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java index 815a616a403..6d1a299d0bc 100644 --- a/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java +++ b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java @@ -19,10 +19,8 @@ package org.apache.avro.perf; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; -import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; From 88c8eeddc227d3db54814d0900f38f7907cdf4e2 Mon Sep 17 00:00:00 2001 From: mkeskells Date: Fri, 8 May 2026 22:42:25 +0100 Subject: [PATCH 5/5] AVRO-4249 add a soft reference cache --- .../org/apache/avro/file/DataFileStream.java | 4 + .../org/apache/avro/file/SchemaCache.java | 76 ++++++++++++++++++- .../org/apache/avro/file/TestSchemaCache.java | 62 +++++++++++++-- .../avro/perf/SchemaCacheEffectTest.java | 2 +- 4 files changed, 134 insertions(+), 10 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java index e7d987b3d33..d5e3a1d8c99 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java @@ -25,6 +25,7 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import org.apache.avro.file.SchemaCache.WeakSchemaCache; +import org.apache.avro.file.SchemaCache.SoftSchemaCache; import java.io.Closeable; import java.io.EOFException; @@ -59,6 +60,9 @@ public class DataFileStream implements Iterator, Iterable, Closeable { case "weak": SCHEMA_CACHE = WeakSchemaCache.INSTANCE; break; + case "soft": + SCHEMA_CACHE = SoftSchemaCache.INSTANCE; + break; default: throw new IllegalArgumentException("Unknown schema cache type: " + cacheType); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java index 923054578fa..fbbe02bd16e 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java @@ -19,14 +19,16 @@ import org.apache.avro.NameValidator; import org.apache.avro.Schema; + import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * Abstract base class for caching parsed Avro schemas. Provides different - * caching strategies including no cache, concurrent cache, and weak cache. + * caching strategies including no cache, concurrent cache, weak and soft cache. */ public abstract class SchemaCache { @@ -48,7 +50,9 @@ public Schema getOrParseSchema(String metaString) { */ public abstract Schema getOrParseSchema(String metaString); - /** return the number of cached entries */ + /** + * return the number of cached entries + */ public int size() { return 0; } @@ -142,6 +146,65 @@ private static class WeakValueRef extends WeakReference { } } + /** + * A cache implementation that uses soft references for schema values, allowing + * them to be garbage collected when not strongly referenced, and the collector + * sees fit. + */ + public static class SoftSchemaCache extends SchemaCache { + /** + * A weak cache implementation that allows schema values to be garbage + * collected. + */ + public static final SchemaCache INSTANCE = createSoftCache(); + + private final ConcurrentMap cache = new ConcurrentHashMap<>(); + private final ReferenceQueue queue = new ReferenceQueue<>(); + + @Override + public Schema getOrParseSchema(String metaString) { + trim(); + + return cache.compute(metaString, (k, ref) -> { + if (ref != null) { + Schema schema = ref.get(); + if (schema != null) { + return ref; + } + } + // Absent or cleared, parse new + Schema schema = parse(metaString); + return new SoftValueRef(k, schema, queue); + }).get(); + } + + @Override + public int size() { + trim(); + return cache.size(); + } + + /** + * Cleans up entries with cleared references from the cache. + */ + void trim() { + // Clean up cleared references + SoftValueRef clearedRef; + while ((clearedRef = (SoftValueRef) queue.poll()) != null) { + cache.remove(clearedRef.key, clearedRef); + } + } + + private static class SoftValueRef extends SoftReference { + final String key; + + SoftValueRef(String key, Schema referent, ReferenceQueue q) { + super(referent, q); + this.key = key; + } + } + } + /** * Creates a weak cache that allows schema values to be garbage collected. * @@ -150,4 +213,13 @@ private static class WeakValueRef extends WeakReference { public static WeakSchemaCache createWeakCache() { return new WeakSchemaCache(); } + + /** + * Creates a soft cache that allows schema values to be garbage collected. + * + * @return a SoftSchemaCache instance + */ + public static SoftSchemaCache createSoftCache() { + return new SoftSchemaCache(); + } } diff --git a/lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java b/lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java index dc31e0a56ee..bffbb1c23a7 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java +++ b/lang/java/avro/src/test/java/org/apache/avro/file/TestSchemaCache.java @@ -20,9 +20,14 @@ import org.apache.avro.Schema; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import java.util.ArrayList; +import java.util.List; import static org.junit.jupiter.api.Assertions.*; +@Isolated public class TestSchemaCache { private static final String SCHEMA_STRING = "{\"type\": \"record\", \"name\": \"Test\", \"fields\": [{\"name\": \"f\", \"type\": \"string\"}]}"; @@ -74,17 +79,60 @@ void testWeakCacheAllowsGc() throws InterruptedException { System.runFinalization(); Thread.sleep(100); // Allow GC to run } + // the isnt really a guarantee that its cleared by now, but is seems ok fo the + // unit test + // at least. + // So by now we expect the GC to have cleared the schema from the cache, + // so the weak reference should be null and the cache should be empty (or more + // accurately become empty when checked). assertNull(weakRef.get(), "Schema should have been GC'd"); assertEquals(0, cache.size(), "Cache should be empty after GC"); + // and now it should be able to parse again and cache it again + Schema retrieved = cache.getOrParseSchema(SCHEMA_STRING); + assertNotNull(retrieved); + assertEquals(1, cache.size(), "Cache should be 1 after fetch"); + } + + @Test + void testSoftCacheAllowsGc() throws InterruptedException { + SchemaCache.SoftSchemaCache cache = SchemaCache.createSoftCache(); + // Parse schema + Schema schema = cache.getOrParseSchema(SCHEMA_STRING); + assertEquals(1, cache.size(), "Cache should be 1 after fetch"); + // Hold a weak reference to it + java.lang.ref.WeakReference weakRef = new java.lang.ref.WeakReference<>(schema); + // Clear strong reference + schema = null; + // Force GC + + for (int i = 0; i < 10 & !weakRef.isEnqueued(); i++) { + System.gc(); + System.runFinalization(); + Thread.sleep(100); // Allow GC to run + } + // the isnt really a guarantee that its cler or not cleared by now, but is seems + // ok fo the unit test + // at least. + // So by now we expect the GC to have not cleared the schema from the cache, as + // there isnt any pressure + // so the weak reference should be not null and the cache should not be empty. + + assertNotNull(weakRef.get(), "Schema should not have been GC'd, its soft and there is no pressure"); + assertEquals(1, cache.size(), "Cache should not be empty after GC"); Schema retrieved = cache.getOrParseSchema(SCHEMA_STRING); assertNotNull(retrieved); assertEquals(1, cache.size(), "Cache should be 1 after fetch"); - // Now clear cache reference by trimming if possible, but since it's weak, after - // GC it should be gone - // But in practice, the cache holds the WeakReference, so the Schema is still - // referenced. - // To test GC, we need to not hold the retrieved reference. - // This is hard to test reliably in unit tests. - // For now, just test basic caching. + retrieved = null; + // Now we can try to create some memory pressure to encourage GC to clear the + // soft reference. + try { + List allMemory = new ArrayList<>(20000); + while (true) { + allMemory.add(new byte[1024 * 1024 * 1024]); // Allocate 1GB chunks + } + } catch (OutOfMemoryError e) { + } + assertEquals(0, cache.size(), "Cache should not be empty after GC"); + } } diff --git a/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java index 6d1a299d0bc..235c58d9434 100644 --- a/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java +++ b/lang/java/perf/src/main/java/org/apache/avro/perf/SchemaCacheEffectTest.java @@ -57,7 +57,7 @@ public class SchemaCacheEffectTest { @Param({ "5", "50", "500" }) private int numFields; - @Param({ "none", "weak" }) + @Param({ "none", "weak", "soft" }) private String cacheType; @Param({ "1", "10", "100" })