Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.avro;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
Expand Down Expand Up @@ -58,6 +59,30 @@ public class SystemLimitException extends AvroRuntimeException {
private static int maxCollectionLength = MAX_ARRAY_VM_LIMIT;
private static int maxStringLength = MAX_ARRAY_VM_LIMIT;

private static final Logger LOG = LoggerFactory.getLogger(SystemLimitException.class);

/**
* System property declaring max size of any decompression stream: {@value}.
*/
public static final String MAX_DECOMPRESS_LENGTH_PROPERTY = "org.apache.avro.limits.decompress.maxLength";

/**
* Default limit when it is lower than the heap-aware limit: {@value}.
*/
private static final long DEFAULT_MAX_DECOMPRESS_LENGTH = 200L * 1024 * 1024;

/**
* Keep the default decompression limit below the maximum heap to avoid allowing
* a single block to exhaust constrained JVMs: {@value}.
*/
private static final long DEFAULT_MAX_DECOMPRESS_HEAP_FRACTION = 4;

/**
* Calculated max decompress length.
*/
public static final long MAX_DECOMPRESS_LENGTH = getLongLimitFromProperty(MAX_DECOMPRESS_LENGTH_PROPERTY,
defaultMaxDecompressLength());

static {
resetLimits();
}
Expand Down Expand Up @@ -89,6 +114,44 @@ private static int getLimitFromProperty(String property, int defaultValue) {
return i;
}

/**
* Get a long value stored in a system property, used to configure the system
* behaviour of output.
*
* @param property The system property to fetch
* @param defaultValue The value to use if the system property is not present or
* parsable as a long
* @return The value from the system property
*/
private static long getLongLimitFromProperty(String property, long defaultValue) {
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
String prop = System.getProperty(property);
long limit = defaultValue;
if (prop != null) {
try {
long parsed = Long.parseLong(prop);
if (parsed <= 0) {
LOG.warn("Invalid value '{}' for property '{}': must be positive. Using default: {}", prop, property,
defaultValue);
} else {
limit = parsed;
}
} catch (NumberFormatException e) {
LOG.warn("Could not parse property '{}' value '{}'. Using default: {}", property, prop, defaultValue);
}
}
return limit;
}

/**
* Calculate a max decompression length as a fraction of
* the maximum memory of the runtime.
* @return the calculated max default decompression length.
*/
private static long defaultMaxDecompressLength() {
return Math.min(DEFAULT_MAX_DECOMPRESS_LENGTH,
Math.max(1L, Runtime.getRuntime().maxMemory() / DEFAULT_MAX_DECOMPRESS_HEAP_FRACTION));
}

/**
* Check to ensure that reading the bytes is within the specified limits.
*
Expand Down Expand Up @@ -209,6 +272,21 @@ public static int checkMaxStringLength(long length) {
return (int) length;
}

/**
* Check there is capacity to write data to a stream.
*
* @param limit total capacity limit
* @param streamLength current stream size
* @param bytes bytes to add to the stream
* @throws SystemLimitException if the limit is exceeded.
*/
public static void checkMaxDecompressCapacity(long limit, long streamLength, int bytes) {
if (streamLength + bytes > limit) {
throw new SystemLimitException(
String.format("Buffer size %,d (bytes) exceeds maximum allowed size %,d.", (streamLength + bytes), limit));
}
}

/** Reread the limits from the system properties. */
// VisibleForTesting
static void resetLimits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;

import static org.apache.avro.util.NonCopyingByteArrayOutputStream.capacityLimitedOutputStream;

/** * Implements bzip2 compression and decompression. */
public class BZip2Codec extends Codec {

Expand Down Expand Up @@ -60,7 +62,7 @@ public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
compressedData.remaining());

@SuppressWarnings("resource")
NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
NonCopyingByteArrayOutputStream baos = capacityLimitedOutputStream(DEFAULT_BUFFER_SIZE);

try (BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.avro.util.NonCopyingByteArrayOutputStream;

import static org.apache.avro.util.NonCopyingByteArrayOutputStream.capacityLimitedOutputStream;

/**
* Implements DEFLATE (RFC1951) compression and decompression.
*
Expand Down Expand Up @@ -78,7 +80,7 @@ public ByteBuffer compress(ByteBuffer data) throws IOException {

@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
NonCopyingByteArrayOutputStream baos = capacityLimitedOutputStream(DEFAULT_BUFFER_SIZE);
try (OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.xerial.snappy.Snappy;

import org.apache.avro.SystemLimitException;

/** * Implements Snappy compression and decompression. */
public class SnappyCodec extends Codec {
private final CRC32 crc32 = new CRC32();
Expand Down Expand Up @@ -66,7 +68,9 @@ public ByteBuffer compress(ByteBuffer in) throws IOException {
@Override
public ByteBuffer decompress(ByteBuffer in) throws IOException {
int offset = computeOffset(in);
ByteBuffer out = ByteBuffer.allocate(Snappy.uncompressedLength(in.array(), offset, in.remaining() - 4));
final int uncompressedLength = Snappy.uncompressedLength(in.array(), offset, in.remaining() - 4);
SystemLimitException.checkMaxDecompressCapacity(SystemLimitException.MAX_DECOMPRESS_LENGTH, 0, uncompressedLength);
ByteBuffer out = ByteBuffer.allocate(uncompressedLength);
int size = Snappy.uncompress(in.array(), offset, in.remaining() - 4, out.array(), 0);
((Buffer) out).limit(size);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public ByteBuffer compress(ByteBuffer data) throws IOException {

@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
NonCopyingByteArrayOutputStream baos = NonCopyingByteArrayOutputStream
.capacityLimitedOutputStream(DEFAULT_BUFFER_SIZE);
InputStream bytesIn = new ByteArrayInputStream(data.array(), computeOffset(data), data.remaining());

try (InputStream ios = new XZCompressorInputStream(bytesIn)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public ByteBuffer compress(ByteBuffer data) throws IOException {

@Override
public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
NonCopyingByteArrayOutputStream baos = NonCopyingByteArrayOutputStream
.capacityLimitedOutputStream(DEFAULT_BUFFER_SIZE);
InputStream bytesIn = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData),
compressedData.remaining());
try (InputStream ios = ZstandardLoader.input(bytesIn, useBufferPool)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,45 @@

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;

import org.apache.avro.SystemLimitException;

/**
* Utility to make data written to an {@link ByteArrayOutputStream} directly
* available as a {@link ByteBuffer}.
* available as a {@link ByteBuffer}.Supports limits to the amount of data which
* may be written. All decompressors MUST create capacity restricted streams to
* prevent maliciously compressed data to trigger memory problems across
* threads.
*
*/
public class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {

/**
* Creates a new byte array output stream, with a buffer capacity of the
* specified size, in bytes.
* Size limit, -1 for no limits.
*/
private final long limit;

/**
* Creates a new byte array output stream, with no size limit.
*
* @param size the initial size
* @throws IllegalArgumentException if size is negative
*/
public NonCopyingByteArrayOutputStream(int size) {
this(size, -1);
}

/**
* Creates a new byte array output stream, with a buffer capacity of the
* specified size, in bytes, capacity limit as specified in {@code limit}.
*
* @param size buffer capacity
* @param limit size limit or -1 for no limit.
*/
private NonCopyingByteArrayOutputStream(final int size, final long limit) {
super(size);
this.limit = limit;
}

/**
Expand All @@ -48,4 +71,65 @@ public NonCopyingByteArrayOutputStream(int size) {
public ByteBuffer asByteBuffer() {
return ByteBuffer.wrap(super.buf, 0, super.count);
}

/**
* Check there is capacity to write data. Throws SystemLimitException if the
* limit is exceeded.
*
* @param bytes bytes to add
*/
private void checkCapacity(int bytes) {
if (limit > 0) {
SystemLimitException.checkMaxDecompressCapacity(limit, size(), bytes);
}
}

@Override
public synchronized void write(final int b) {
checkCapacity(1);
super.write(b);
}

@Override
public synchronized void write(final byte[] b, final int off, final int len) {
Objects.requireNonNull(b);
Objects.checkFromIndexSize(off, len, b.length);
checkCapacity(len);
super.write(b, off, len);
}

@Override
public void writeBytes(final byte[] b) {
Objects.requireNonNull(b);
checkCapacity(b.length);
super.writeBytes(b);
}

/**
* Creates a new byte array output stream, with a buffer capacity of the
* specified size, in bytes. The amount of data which can be written to any
* output stream is limited by the system property
* {@link SystemLimitException#MAX_DECOMPRESS_LENGTH_PROPERTY}
*
* @param size buffer capacity
* @return the output stream
*/
public static NonCopyingByteArrayOutputStream capacityLimitedOutputStream(final int size) {
final long limit = SystemLimitException.MAX_DECOMPRESS_LENGTH;
return new NonCopyingByteArrayOutputStream((int) Math.min(size, limit), limit);
}

/**
* Creates a new byte array output stream, with a buffer capacity of the
* specified size, in bytes, capacity limit as specified in {@code limit}.
*
* @param size buffer capacity
* @param limit max size of output buffer
* @return the output stream
*/
public static NonCopyingByteArrayOutputStream capacityLimitedOutputStream(final int size, long limit) {
final int initialSize = limit > 0 ? (int) Math.min(size, limit) : size;
return new NonCopyingByteArrayOutputStream(initialSize, limit);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.avro.util;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.junit.jupiter.api.Test;

import org.apache.avro.SystemLimitException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class NonCopyingByteArrayOutputStreamTest {

/**
* Basic test: write then read.
*/
@Test
public void testDefaultWriteWorks() throws IOException {
NonCopyingByteArrayOutputStream out = new NonCopyingByteArrayOutputStream(1);
out.write('a');
final byte[] b = "string".getBytes();
out.write(b, 0, b.length);
out.close();
final ByteBuffer buffer = out.asByteBuffer();
assertEquals('a', buffer.get());
for (byte value : b) {
assertEquals(value, buffer.get());
}
}

/**
* Test write limiting.
*/
@Test
public void testLimitedWrite() throws IOException {
NonCopyingByteArrayOutputStream out = NonCopyingByteArrayOutputStream.capacityLimitedOutputStream(1, 4);
out.write('a');
// it's impossible to go over the limit in a write(bytes) call.
final byte[] b = "longstring".getBytes();
assertThrows(SystemLimitException.class, () -> out.write(b),
"Buffer size 11 (bytes) exceeds maximum allowed size 4.");
// we can still write up to the limit...the buffer has not been written to yet.
out.write(b, 0, 2);
out.write('z');
// now at end of file, so another write shall fail.
assertThrows(SystemLimitException.class, () -> out.write('x'));
out.close();
// validate everything successfully written is there
final ByteBuffer buffer = out.asByteBuffer();
for (byte value : "aloz".getBytes()) {
assertEquals(value, buffer.get());
}
}

@Test
public void testLimitedWriteBytes() {
NonCopyingByteArrayOutputStream out = NonCopyingByteArrayOutputStream.capacityLimitedOutputStream(1, 4);
out.writeBytes("abcd".getBytes());
assertThrows(SystemLimitException.class, () -> out.writeBytes("e".getBytes()));
}

@Test
public void testInitialCapacityIsClampedToLimit() throws IOException {
NonCopyingByteArrayOutputStream out = NonCopyingByteArrayOutputStream.capacityLimitedOutputStream(1024, 4);
out.write("abcd".getBytes());
assertThrows(SystemLimitException.class, () -> out.write('e'));
}

@Test
public void testInnerLimitCheck() throws Throwable {
assertThrows(SystemLimitException.class, () -> SystemLimitException.checkMaxDecompressCapacity(256L, 0, 100_000));
SystemLimitException.checkMaxDecompressCapacity(256L, 0, 256);
}
}
Loading