Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions kafka/protocol/schemas/fields/codecs/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,19 @@ class Bytes:

@classmethod
def encode(cls, value, compact=False):
if value is not None and not isinstance(value, bytes):
if value is not None and not isinstance(value, (bytes, bytearray, memoryview)):
value = value.encode()
if compact:
if value is None:
return UnsignedVarInt32.encode(0)
return UnsignedVarInt32.encode(len(value) + 1) + value
return UnsignedVarInt32.encode(len(value) + 1) + bytes(value)
if value is None:
return Int32.encode(-1)
return Int32.encode(len(value)) + value
return Int32.encode(len(value)) + bytes(value)

@classmethod
def encode_into(cls, out, value, compact=False):
if value is not None and not isinstance(value, bytes):
if value is not None and not isinstance(value, (bytes, bytearray, memoryview)):
value = value.encode()
if compact:
if value is None:
Expand All @@ -257,7 +257,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False):
bn = ctx.next_var('bn')
if compact:
ctx.emit(indent, '%s = %s' % (bv, val_expr))
ctx.emit(indent, 'if %s is not None and not isinstance(%s, bytes): %s = %s.encode()' % (bv, bv, bv, bv))
ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv))
ctx.emit(indent, 'if %s is None:' % bv)
ctx.emit(indent, ' buf[pos] = 0')
ctx.emit(indent, ' pos += 1')
Expand All @@ -273,7 +273,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False):
ctx.emit(indent, ' pos += %s' % bn)
else:
ctx.emit(indent, '%s = %s' % (bv, val_expr))
ctx.emit(indent, 'if %s is not None and not isinstance(%s, bytes): %s = %s.encode()' % (bv, bv, bv, bv))
ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv))
ctx.emit(indent, 'if %s is None:' % bv)
ctx.emit(indent, " pack_into('>i', buf, pos, -1)")
ctx.emit(indent, ' pos += 4')
Expand Down
6 changes: 5 additions & 1 deletion kafka/record/memory_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ def close(self):
# see Issue 718
if not self._closed:
self._bytes_written = self._builder.size()
self._buffer = bytes(self._builder.build())
# Keep the buffer as bytearray to avoid a full-batch copy on
# close. Downstream consumers (MemoryRecords via memoryview and
# the protocol encoder via slice-assignment) handle bytearray
# without further copies.
self._buffer = self._builder.build()
if self._magic == 2:
self._producer_id = self._builder.producer_id
self._producer_epoch = self._builder.producer_epoch
Expand Down
Loading