Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/net/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ def _sock_send(self):
err = None
while self._write_buffer:
next_chunk = self._write_buffer.popleft()
# Wrap in memoryview so partial-send slicing is O(1) instead of
# copying the unsent tail on every BlockingIOError / short write.
if not isinstance(next_chunk, memoryview):
next_chunk = memoryview(next_chunk)
while next_chunk:
try:
sent_bytes = self._sock.send(next_chunk)
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/metadata/api_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def encode_header(self, flexible=False):
# ApiVersionsResponse body always decodes as version 0 when error is present
@classmethod
def decode(cls, data, version=None, header=False, framed=False):
if isinstance(data, bytes):
if not hasattr(data, 'tell'):
data = io.BytesIO(data)
if framed:
size = Int32.decode(data)
Expand Down
16 changes: 12 additions & 4 deletions kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,18 @@ def send_request(self, request, correlation_id=None):

def send_bytes(self):
"""Retrieve all pending bytes to send on the network"""
data = b''.join(self.bytes_to_send)
self.bytes_to_send = []
if data:
log.debug('%s Send: %r', self._ident, data)
# Short-circuit the common single-request case to avoid an extra
# full-request copy through b''.join.
n = len(self.bytes_to_send)
if n == 0:
return b''
if n == 1:
data = self.bytes_to_send[0]
self.bytes_to_send = []
else:
data = b''.join(self.bytes_to_send)
self.bytes_to_send = []
log.debug('%s Send: %r', self._ident, data)
return data

def receive_bytes(self, data):
Expand Down
6 changes: 5 additions & 1 deletion kafka/protocol/schemas/fields/codecs/encode_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ def ensure(self, needed):
self.buf = new_buf

def result(self):
return bytes(self.buf[:self.pos])
# Return a bytearray slice (one copy) rather than bytes(self.buf[:pos])
# (two copies — the slice creates a bytearray, then bytes() copies
# again). Downstream consumers (protocol codec slice assignment,
# socket.send) accept bytearray transparently.
return self.buf[:self.pos]


class EncodeBufferPool:
Expand Down
34 changes: 25 additions & 9 deletions kafka/record/_crc32c.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,39 @@
_MASK = 0xFFFFFFFF


def crc_update(crc, data):
def crc_update(crc, data,
# Pull hot globals into the local namespace; the inner loop
# runs once per byte and LOAD_FAST is measurably faster than
# LOAD_GLOBAL in CPython.
_TABLE=CRC_TABLE, _M=_MASK):
"""Update CRC-32C checksum with data.

Args:
crc: 32-bit checksum to update as long.
data: byte array, string or iterable over bytes.
data: bytes, bytearray, memoryview, array.array("B"), or any
iterable yielding ints in [0, 255].
Returns:
32-bit updated CRC-32C as long.
"""
if not isinstance(data, array.array) or data.itemsize != 1:
buf = array.array("B", data)
else:
# Iterate directly over bytes / bytearray / memoryview(format='B'),
# which all yield ints in [0, 255] natively. Only fall back to the
# array.array copy for other input types (e.g. a string on py2, or
# a generic iterable of ints).
if isinstance(data, (bytes, bytearray)):
buf = data
crc = crc ^ _MASK
elif isinstance(data, memoryview):
if data.format != 'B' or data.itemsize != 1:
buf = data.cast('B') # reinterpret as bytes, still zero-copy
else:
buf = data
elif isinstance(data, array.array) and data.itemsize == 1:
buf = data
else:
buf = array.array("B", data)
crc ^= _M
for b in buf:
table_index = (crc ^ b) & 0xff
crc = (CRC_TABLE[table_index] ^ (crc >> 8)) & _MASK
return crc ^ _MASK
crc = (_TABLE[(crc ^ b) & 0xff] ^ (crc >> 8)) & _M
return crc ^ _M


def crc_finalize(crc):
Expand Down
6 changes: 4 additions & 2 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def validate_crc(self):

crc = self.crc
data_view = memoryview(self._buffer)[self.ATTRIBUTES_OFFSET:]
verify_crc = calc_crc32c(data_view.tobytes())
verify_crc = calc_crc32c(data_view)
return crc == verify_crc

def __str__(self):
Expand Down Expand Up @@ -646,7 +646,9 @@ def write_header(self, use_compression_type=True):
self._base_sequence,
self._num_records
)
crc = calc_crc32c(self._buffer[self.ATTRIBUTES_OFFSET:])
# Use memoryview to avoid a full-body copy of ~batch_size bytes.
# The decode path at _check_crc already does this.
crc = calc_crc32c(memoryview(self._buffer)[self.ATTRIBUTES_OFFSET:])
struct.pack_into(">I", self._buffer, self.CRC_OFFSET, crc)

def _maybe_compress(self):
Expand Down
8 changes: 6 additions & 2 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ def magic(self):
return self._magic

def validate_crc(self):
crc = calc_crc32(self._buffer[self.MAGIC_OFFSET:])
# memoryview avoids a full-body copy when slicing the bytearray.
crc = calc_crc32(memoryview(self._buffer)[self.MAGIC_OFFSET:])
return self._crc == crc

def _decompress(self, key_offset):
Expand Down Expand Up @@ -249,7 +250,10 @@ def _read_key_value(self, pos):
return key, value

def _crc_bytes(self, msg_pos, length):
return self._buffer[msg_pos + self.MAGIC_OFFSET:msg_pos + self.LOG_OVERHEAD + length]
# memoryview avoids copying the message bytes out of the batch buffer
# just to hand them to calc_crc32 later in LegacyRecord.validate_crc.
return memoryview(self._buffer)[
msg_pos + self.MAGIC_OFFSET:msg_pos + self.LOG_OVERHEAD + length]

def __iter__(self):
if self._magic == 1:
Expand Down
Loading