Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.file.SchemaCache.WeakSchemaCache;
import org.apache.avro.file.SchemaCache.SoftSchemaCache;

import java.io.Closeable;
import java.io.EOFException;
Expand All @@ -49,6 +50,24 @@
*/
public class DataFileStream<D> implements Iterator<D>, Iterable<D>, Closeable {

private final static SchemaCache SCHEMA_CACHE;
static {
String cacheType = System.getProperty("avro.schema.cache", "none").toLowerCase();
switch (cacheType) {
case "none":
SCHEMA_CACHE = SchemaCache.NO_CACHE;
break;
case "weak":
SCHEMA_CACHE = WeakSchemaCache.INSTANCE;
break;
case "soft":
SCHEMA_CACHE = SoftSchemaCache.INSTANCE;
break;
default:
throw new IllegalArgumentException("Unknown schema cache type: " + cacheType);
}
}

/**
* A handle that can be used to reopen a DataFile without re-reading the header
* of the stream.
Expand Down Expand Up @@ -118,6 +137,11 @@ void validateMagic(byte[] magic) throws InvalidAvroMagicException {

/** Initialize the stream by reading from its head. */
void initialize(InputStream in, byte[] magic) throws IOException {
initialize(in, magic, SCHEMA_CACHE);
}

/** Initialize the stream by reading from its head. */
protected void initialize(InputStream in, byte[] magic, SchemaCache schemaCache) throws IOException {
this.header = new Header();
this.vin = DecoderFactory.get().binaryDecoder(in, vin);
magic = (magic == null) ? readMagic() : magic;
Expand All @@ -140,8 +164,8 @@ void initialize(InputStream in, byte[] magic) throws IOException {

// finalize the header
header.metaKeyList = Collections.unmodifiableList(header.metaKeyList);
header.schema = new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false)
.parse(getMetaString(DataFileConstants.SCHEMA));
header.schema = schemaCache.getOrParseSchema(getMetaString(DataFileConstants.SCHEMA));

this.codec = resolveCodec();
reader.setSchema(header.schema);
}
Expand Down
225 changes: 225 additions & 0 deletions lang/java/avro/src/main/java/org/apache/avro/file/SchemaCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.file;

import org.apache.avro.NameValidator;
import org.apache.avro.Schema;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Abstract base class for caching parsed Avro schemas. Provides different
* caching strategies including no cache, concurrent cache, weak and soft cache.
*/
public abstract class SchemaCache {

/**
* A cache implementation that does not cache schemas, always parsing them.
*/
public static final SchemaCache NO_CACHE = new SchemaCache() {
@Override
public Schema getOrParseSchema(String metaString) {
return parse(metaString);
}
};

/**
* Gets the schema for the given meta string, parsing it if not known.
*
* @param metaString the schema string to parse or retrieve from cache
* @return the parsed Schema
*/
public abstract Schema getOrParseSchema(String metaString);

/**
* return the number of cached entries
*/
public int size() {
return 0;
}

/**
* Parses the schema string into a Schema object.
*
* @param metaString the schema string
* @return the parsed Schema
*/
protected Schema parse(String metaString) {
return new Schema.Parser(NameValidator.NO_VALIDATION).setValidateDefaults(false).parse(metaString);
}

/**
* Creates a concurrent cache that strongly references schemas.
*
* @return a SchemaCache with concurrent strong caching
*/
public static SchemaCache createConcurrentCache() {
return new SchemaCache() {
private final ConcurrentMap<String, Schema> cache = new ConcurrentHashMap<>();

@Override
public int size() {
return cache.size();
}

@Override
public Schema getOrParseSchema(String metaString) {
return cache.computeIfAbsent(metaString, this::parse);
}
};
}

/**
* A cache implementation that uses weak references for schema values, allowing
* them to be garbage collected when not strongly referenced.
*/
public static class WeakSchemaCache extends SchemaCache {
/**
* A weak cache implementation that allows schema values to be garbage
* collected.
*/
public static final SchemaCache INSTANCE = createWeakCache();

private final ConcurrentMap<String, WeakValueRef> cache = new ConcurrentHashMap<>();
private final ReferenceQueue<Schema> queue = new ReferenceQueue<>();

@Override
public Schema getOrParseSchema(String metaString) {
trim();

return cache.compute(metaString, (k, ref) -> {
if (ref != null) {
Schema schema = ref.get();
if (schema != null) {
return ref;
}
}
// Absent or cleared, parse new
Schema schema = parse(metaString);
return new WeakValueRef(k, schema, queue);
}).get();
}

@Override
public int size() {
trim();
return cache.size();
}

/**
* Cleans up entries with cleared weak references from the cache.
*/
void trim() {
// Clean up cleared references
WeakValueRef clearedRef;
while ((clearedRef = (WeakValueRef) queue.poll()) != null) {
cache.remove(clearedRef.key, clearedRef);
}
}

private static class WeakValueRef extends WeakReference<Schema> {
final String key;

WeakValueRef(String key, Schema referent, ReferenceQueue<Schema> q) {
super(referent, q);
this.key = key;
}
}
}

/**
* A cache implementation that uses soft references for schema values, allowing
* them to be garbage collected when not strongly referenced, and the collector
* sees fit.
*/
public static class SoftSchemaCache extends SchemaCache {
/**
* A weak cache implementation that allows schema values to be garbage
* collected.
*/
public static final SchemaCache INSTANCE = createSoftCache();

private final ConcurrentMap<String, SoftValueRef> cache = new ConcurrentHashMap<>();
private final ReferenceQueue<Schema> queue = new ReferenceQueue<>();

@Override
public Schema getOrParseSchema(String metaString) {
trim();

return cache.compute(metaString, (k, ref) -> {
if (ref != null) {
Schema schema = ref.get();
if (schema != null) {
return ref;
}
}
// Absent or cleared, parse new
Schema schema = parse(metaString);
return new SoftValueRef(k, schema, queue);
}).get();
}

@Override
public int size() {
trim();
return cache.size();
}

/**
* Cleans up entries with cleared references from the cache.
*/
void trim() {
// Clean up cleared references
SoftValueRef clearedRef;
while ((clearedRef = (SoftValueRef) queue.poll()) != null) {
cache.remove(clearedRef.key, clearedRef);
}
}

private static class SoftValueRef extends SoftReference<Schema> {
final String key;

SoftValueRef(String key, Schema referent, ReferenceQueue<Schema> q) {
super(referent, q);
this.key = key;
}
}
}

/**
* Creates a weak cache that allows schema values to be garbage collected.
*
* @return a WeakSchemaCache instance
*/
public static WeakSchemaCache createWeakCache() {
return new WeakSchemaCache();
}

/**
* Creates a soft cache that allows schema values to be garbage collected.
*
* @return a SoftSchemaCache instance
*/
public static SoftSchemaCache createSoftCache() {
return new SoftSchemaCache();
}
}
Loading