diff --git a/kafka/protocol/schemas/fields/codecs/types.py b/kafka/protocol/schemas/fields/codecs/types.py index 718fb826c..1e504501f 100644 --- a/kafka/protocol/schemas/fields/codecs/types.py +++ b/kafka/protocol/schemas/fields/codecs/types.py @@ -219,19 +219,19 @@ class Bytes: @classmethod def encode(cls, value, compact=False): - if value is not None and not isinstance(value, bytes): + if value is not None and not isinstance(value, (bytes, bytearray, memoryview)): value = value.encode() if compact: if value is None: return UnsignedVarInt32.encode(0) - return UnsignedVarInt32.encode(len(value) + 1) + value + return UnsignedVarInt32.encode(len(value) + 1) + bytes(value) if value is None: return Int32.encode(-1) - return Int32.encode(len(value)) + value + return Int32.encode(len(value)) + bytes(value) @classmethod def encode_into(cls, out, value, compact=False): - if value is not None and not isinstance(value, bytes): + if value is not None and not isinstance(value, (bytes, bytearray, memoryview)): value = value.encode() if compact: if value is None: @@ -257,7 +257,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False): bn = ctx.next_var('bn') if compact: ctx.emit(indent, '%s = %s' % (bv, val_expr)) - ctx.emit(indent, 'if %s is not None and not isinstance(%s, bytes): %s = %s.encode()' % (bv, bv, bv, bv)) + ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv)) ctx.emit(indent, 'if %s is None:' % bv) ctx.emit(indent, ' buf[pos] = 0') ctx.emit(indent, ' pos += 1') @@ -273,7 +273,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False): ctx.emit(indent, ' pos += %s' % bn) else: ctx.emit(indent, '%s = %s' % (bv, val_expr)) - ctx.emit(indent, 'if %s is not None and not isinstance(%s, bytes): %s = %s.encode()' % (bv, bv, bv, bv)) + ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv)) ctx.emit(indent, 'if %s is None:' % bv) ctx.emit(indent, " pack_into('>i', buf, pos, -1)") ctx.emit(indent, ' pos += 4') diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index 3ef2c3bfc..4919d22f0 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -207,7 +207,11 @@ def close(self): # see Issue 718 if not self._closed: self._bytes_written = self._builder.size() - self._buffer = bytes(self._builder.build()) + # Keep the buffer as bytearray to avoid a full-batch copy on + # close. Downstream consumers (MemoryRecords via memoryview and + # the protocol encoder via slice-assignment) handle bytearray + # without further copies. + self._buffer = self._builder.build() if self._magic == 2: self._producer_id = self._builder.producer_id self._producer_epoch = self._builder.producer_epoch