From c9e8142fa2cae9c4f1dfc6a5a389a108c4a65051 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 9 Apr 2026 23:28:01 -0700 Subject: [PATCH 1/5] kafka.net.transport: Use memoryview for partial-send slices --- kafka/net/transport.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka/net/transport.py b/kafka/net/transport.py index 9a4075014..99095511e 100644 --- a/kafka/net/transport.py +++ b/kafka/net/transport.py @@ -200,6 +200,10 @@ def _sock_send(self): err = None while self._write_buffer: next_chunk = self._write_buffer.popleft() + # Wrap in memoryview so partial-send slicing is O(1) instead of + # copying the unsent tail on every BlockingIOError / short write. + if not isinstance(next_chunk, memoryview): + next_chunk = memoryview(next_chunk) while next_chunk: try: sent_bytes = self._sock.send(next_chunk) From 5e73b6038ff544e2ecce6ae823028586e2de262c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 9 Apr 2026 23:28:42 -0700 Subject: [PATCH 2/5] kafka.protocol.parser: reduce extra bytes copies in send_bytes() --- kafka/protocol/parser.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index fa8c44277..45338ed40 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -68,10 +68,18 @@ def send_request(self, request, correlation_id=None): def send_bytes(self): """Retrieve all pending bytes to send on the network""" - data = b''.join(self.bytes_to_send) - self.bytes_to_send = [] - if data: - log.debug('%s Send: %r', self._ident, data) + # Short-circuit the common single-request case to avoid an extra + # full-request copy through b''.join. + n = len(self.bytes_to_send) + if n == 0: + return b'' + if n == 1: + data = self.bytes_to_send[0] + self.bytes_to_send = [] + else: + data = b''.join(self.bytes_to_send) + self.bytes_to_send = [] + log.debug('%s Send: %r', self._ident, data) return data def receive_bytes(self, data): From 36454daec2ac126f9954801b3dde4a6d47a69b89 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 9 Apr 2026 23:29:10 -0700 Subject: [PATCH 3/5] Avoid extra bytes copy in EncodeBuffer.result() --- kafka/protocol/metadata/api_versions.py | 2 +- kafka/protocol/schemas/fields/codecs/encode_buffer.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka/protocol/metadata/api_versions.py b/kafka/protocol/metadata/api_versions.py index 4c2d91f95..e9edaea65 100644 --- a/kafka/protocol/metadata/api_versions.py +++ b/kafka/protocol/metadata/api_versions.py @@ -17,7 +17,7 @@ def encode_header(self, flexible=False): # ApiVersionsResponse body always decodes as version 0 when error is present @classmethod def decode(cls, data, version=None, header=False, framed=False): - if isinstance(data, bytes): + if not hasattr(data, 'tell'): data = io.BytesIO(data) if framed: size = Int32.decode(data) diff --git a/kafka/protocol/schemas/fields/codecs/encode_buffer.py b/kafka/protocol/schemas/fields/codecs/encode_buffer.py index bd44f0b4e..5db2b12ea 100644 --- a/kafka/protocol/schemas/fields/codecs/encode_buffer.py +++ b/kafka/protocol/schemas/fields/codecs/encode_buffer.py @@ -21,7 +21,11 @@ def ensure(self, needed): self.buf = new_buf def result(self): - return bytes(self.buf[:self.pos]) + # Return a bytearray slice (one copy) rather than bytes(self.buf[:pos]) + # (two copies — the slice creates a bytearray, then bytes() copies + # again). Downstream consumers (protocol codec slice assignment, + # socket.send) accept bytearray transparently. + return self.buf[:self.pos] class EncodeBufferPool: From 5920fe367cbffc39ad6e35d415cd306c14919778 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 09:03:13 -0700 Subject: [PATCH 4/5] kafka.records: avoid bytes/bytearray copies calling calc_crc32 --- kafka/record/default_records.py | 6 ++++-- kafka/record/legacy_records.py | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 507de00cf..3577df391 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -359,7 +359,7 @@ def validate_crc(self): crc = self.crc data_view = memoryview(self._buffer)[self.ATTRIBUTES_OFFSET:] - verify_crc = calc_crc32c(data_view.tobytes()) + verify_crc = calc_crc32c(data_view) return crc == verify_crc def __str__(self): @@ -646,7 +646,9 @@ def write_header(self, use_compression_type=True): self._base_sequence, self._num_records ) - crc = calc_crc32c(self._buffer[self.ATTRIBUTES_OFFSET:]) + # Use memoryview to avoid a full-body copy of ~batch_size bytes. + # The decode path at _check_crc already does this. + crc = calc_crc32c(memoryview(self._buffer)[self.ATTRIBUTES_OFFSET:]) struct.pack_into(">I", self._buffer, self.CRC_OFFSET, crc) def _maybe_compress(self): diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index b6e141e3b..b0aff1c1f 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -181,7 +181,8 @@ def magic(self): return self._magic def validate_crc(self): - crc = calc_crc32(self._buffer[self.MAGIC_OFFSET:]) + # memoryview avoids a full-body copy when slicing the bytearray. + crc = calc_crc32(memoryview(self._buffer)[self.MAGIC_OFFSET:]) return self._crc == crc def _decompress(self, key_offset): @@ -249,7 +250,10 @@ def _read_key_value(self, pos): return key, value def _crc_bytes(self, msg_pos, length): - return self._buffer[msg_pos + self.MAGIC_OFFSET:msg_pos + self.LOG_OVERHEAD + length] + # memoryview avoids copying the message bytes out of the batch buffer + # just to hand them to calc_crc32 later in LegacyRecord.validate_crc. + return memoryview(self._buffer)[ + msg_pos + self.MAGIC_OFFSET:msg_pos + self.LOG_OVERHEAD + length] def __iter__(self): if self._magic == 1: From 8365b38a942b7a0b89db48896c44f308e50bc2a0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 10 Apr 2026 09:05:42 -0700 Subject: [PATCH 5/5] crc32.py: Avoid extra copies; optimize table/mask lookups with local vars --- kafka/record/_crc32c.py | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/kafka/record/_crc32c.py b/kafka/record/_crc32c.py index 9b51ad8a9..01e1f0f5b 100644 --- a/kafka/record/_crc32c.py +++ b/kafka/record/_crc32c.py @@ -97,23 +97,39 @@ _MASK = 0xFFFFFFFF -def crc_update(crc, data): +def crc_update(crc, data, + # Pull hot globals into the local namespace; the inner loop + # runs once per byte and LOAD_FAST is measurably faster than + # LOAD_GLOBAL in CPython. + _TABLE=CRC_TABLE, _M=_MASK): """Update CRC-32C checksum with data. + Args: crc: 32-bit checksum to update as long. - data: byte array, string or iterable over bytes. + data: bytes, bytearray, memoryview, array.array("B"), or any + iterable yielding ints in [0, 255]. Returns: 32-bit updated CRC-32C as long. """ - if not isinstance(data, array.array) or data.itemsize != 1: - buf = array.array("B", data) - else: + # Iterate directly over bytes / bytearray / memoryview(format='B'), + # which all yield ints in [0, 255] natively. Only fall back to the + # array.array copy for other input types (e.g. a string on py2, or + # a generic iterable of ints). + if isinstance(data, (bytes, bytearray)): buf = data - crc = crc ^ _MASK + elif isinstance(data, memoryview): + if data.format != 'B' or data.itemsize != 1: + buf = data.cast('B') # reinterpret as bytes, still zero-copy + else: + buf = data + elif isinstance(data, array.array) and data.itemsize == 1: + buf = data + else: + buf = array.array("B", data) + crc ^= _M for b in buf: - table_index = (crc ^ b) & 0xff - crc = (CRC_TABLE[table_index] ^ (crc >> 8)) & _MASK - return crc ^ _MASK + crc = (_TABLE[(crc ^ b) & 0xff] ^ (crc >> 8)) & _M + return crc ^ _M def crc_finalize(crc):