Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions kafka/benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ def print_stats(self):
if self.raw_metrics:
pprint.pprint(metrics)
else:
print('{records-consumed-rate} records/sec ({bytes-consumed-rate} B/sec),'
' {fetch-latency-avg} latency,'
' {fetch-rate} fetch/s,'
' {fetch-size-avg} fetch size,'
' {records-lag-max} max record lag,'
' {records-per-request-avg} records/req'
print('{records-consumed-rate:.0f} records/sec ({bytes-consumed-rate:.0f} B/sec),'
' {fetch-latency-avg:.0f}ms avg latency,'
' {fetch-rate:.0f} avg fetch requests/sec,'
' {fetch-size-avg:.0f} avg fetch size,'
' {records-lag-max:.0f} max record lag,'
' {records-per-request-avg:.0f} avg records/req'
.format(**metrics['consumer-fetch-manager-metrics']))


Expand Down
201 changes: 201 additions & 0 deletions kafka/benchmarks/producer_encode_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""Benchmarks for the producer encode hot path.

Measures the cost of finalizing a record batch and encoding it into a
ProduceRequest — the pipeline that runs on every send to the broker.

To compare two implementations (e.g. before/after a change) run this
script twice and diff the output:

# baseline
git stash
python -m kafka.benchmarks.producer_encode_path -o baseline.json
git stash pop

# new
python -m kafka.benchmarks.producer_encode_path -o new.json

pyperf compare_to baseline.json new.json

For an allocation count (tracemalloc) run with --allocations, which is
a separate mode (not a timing benchmark).
"""
import argparse
import sys
import tracemalloc

import pyperf

from kafka.producer.producer_batch import ProducerBatch
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.structs import TopicPartition
from kafka.protocol.producer import ProduceRequest


DEFAULT_RECORDS_PER_BATCH = 100
DEFAULT_VALUE_SIZE = 128
DEFAULT_BATCH_SIZE = 1 << 20 # 1 MiB — big enough to hold the largest case


def _build_unclosed_batch(num_records, value_size):
tp = TopicPartition('bench', 0)
records = MemoryRecordsBuilder(
magic=2, compression_type=0, batch_size=DEFAULT_BATCH_SIZE)
batch = ProducerBatch(tp, records)
value = b'x' * value_size
for i in range(num_records):
future = batch.try_append(i, None, value, [])
assert future is not None, 'batch too small for %d records' % num_records
return batch


def _build_closed_batch(num_records, value_size):
batch = _build_unclosed_batch(num_records, value_size)
batch.records.close()
return batch


def bench_build_and_close(loops, num_records, value_size):
"""Time build + close — isolates the batch-finalization path.

close() is fast enough that pyperf would need many thousands of loops
to measure it directly, so we include the build cost and rely on the
per-run diff to expose the close() delta.
"""
value = b'x' * value_size
tp = TopicPartition('bench', 0)

t0 = pyperf.perf_counter()
for _ in range(loops):
records = MemoryRecordsBuilder(
magic=2, compression_type=0, batch_size=DEFAULT_BATCH_SIZE)
batch = ProducerBatch(tp, records)
for i in range(num_records):
batch.try_append(i, None, value, [])
batch.records.close()
return pyperf.perf_counter() - t0


def bench_encode(loops, num_records, value_size):
"""Time ProduceRequest.encode() with a pre-closed batch.

Isolates the protocol encoding cost from batch finalization.
"""
batch = _build_closed_batch(num_records, value_size)
buf = batch.records.buffer()
topic_data = [('bench', [(0, buf)])]

t0 = pyperf.perf_counter()
for _ in range(loops):
req = ProduceRequest[8](
transactional_id=None,
acks=1,
timeout_ms=30000,
topic_data=topic_data,
)
req.with_header(correlation_id=1, client_id='bench')
req.encode(framed=True, header=True)
return pyperf.perf_counter() - t0


def bench_full_pipeline(loops, num_records, value_size):
"""Time the full producer hot path: append -> close -> encode.

Mirrors what the Sender thread does for each drained batch.
"""
value = b'x' * value_size
tp = TopicPartition('bench', 0)

t0 = pyperf.perf_counter()
for _ in range(loops):
records = MemoryRecordsBuilder(
magic=2, compression_type=0, batch_size=DEFAULT_BATCH_SIZE)
batch = ProducerBatch(tp, records)
for i in range(num_records):
batch.try_append(i, None, value, [])
batch.records.close()
req = ProduceRequest[8](
transactional_id=None,
acks=1,
timeout_ms=30000,
topic_data=[('bench', [(0, batch.records.buffer())])],
)
req.with_header(correlation_id=1, client_id='bench')
req.encode(framed=True, header=True)
return pyperf.perf_counter() - t0


def report_allocations(num_records, value_size):
"""Use tracemalloc to count bytes allocated during a single encode."""
# Pre-build so we only measure the encode path.
batch = _build_closed_batch(num_records, value_size)
buf = batch.records.buffer()
topic_data = [('bench', [(0, buf)])]
req = ProduceRequest[8](
transactional_id=None,
acks=1,
timeout_ms=30000,
topic_data=topic_data,
)
req.with_header(correlation_id=1, client_id='bench')

# Warmup — populate the compiled-encoder cache so we don't count its
# one-time allocations.
req.encode(framed=True, header=True)

tracemalloc.start()
snap_before = tracemalloc.take_snapshot()
data = req.encode(framed=True, header=True)
snap_after = tracemalloc.take_snapshot()
tracemalloc.stop()

stats = snap_after.compare_to(snap_before, 'lineno')
total_bytes = sum(max(0, s.size_diff) for s in stats)
total_allocs = sum(max(0, s.count_diff) for s in stats)

print('Config : %d records x %d bytes' % (num_records, value_size))
print('Wire bytes : %d' % len(data))
print('Allocated : %d bytes across %d allocations' % (total_bytes, total_allocs))
print()
print('Top allocation sites:')
for s in sorted(stats, key=lambda s: s.size_diff, reverse=True)[:10]:
if s.size_diff <= 0:
continue
frame = s.traceback[0]
loc = '%s:%d' % (frame.filename.split('/workspace/')[-1], frame.lineno)
print(' %8d bytes / %3d allocs %s' % (s.size_diff, s.count_diff, loc))


def main():
p = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument('--records', type=int, default=DEFAULT_RECORDS_PER_BATCH,
help='records per batch (default: %(default)d)')
p.add_argument('--value-size', type=int, default=DEFAULT_VALUE_SIZE,
help='record value size in bytes (default: %(default)d)')
p.add_argument('--allocations', action='store_true',
help='report tracemalloc allocations for one encode (not a timing run)')

# Let pyperf parse its own arguments (--output, --rigorous, etc).
args, pyperf_args = p.parse_known_args()

if args.allocations:
report_allocations(args.records, args.value_size)
return

# Rebuild sys.argv so pyperf sees only its own arguments.
sys.argv = [sys.argv[0]] + pyperf_args

runner = pyperf.Runner()
runner.metadata['records_per_batch'] = args.records
runner.metadata['value_size_bytes'] = args.value_size
runner.bench_time_func(
'build_and_close', bench_build_and_close, args.records, args.value_size)
runner.bench_time_func(
'encode', bench_encode, args.records, args.value_size)
runner.bench_time_func(
'full_pipeline', bench_full_pipeline, args.records, args.value_size)


if __name__ == '__main__':
main()
25 changes: 18 additions & 7 deletions kafka/benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,18 @@ def run(args):
print()

def _benchmark():
# Hoist attribute lookups and use positional args to minimize
# per-iteration harness overhead. With a well-optimized
# producer.send(), kwargs dict packing and repeated attribute
# dereference on `args`/`producer` dominate main-thread CPU
# inside this loop and mask the library's true throughput.
results = []
for i in range(args.num_records):
results.append(producer.send(topic=args.topic, value=record))
send = producer.send
append = results.append
topic = args.topic
num_records = args.num_records
for _ in range(num_records):
append(send(topic, record))
print("Send complete...")
producer.flush()
producer.close()
Expand Down Expand Up @@ -95,11 +104,13 @@ def print_stats(self):
if self.raw_metrics:
pprint.pprint(metrics)
else:
print('{record-send-rate} records/sec ({byte-rate} B/sec),'
' {request-latency-avg} latency,'
' {record-size-avg} record size,'
' {batch-size-avg} batch size,'
' {records-per-request-avg} records/req'
print('{record-send-rate:.0f} records/sec ({byte-rate:.0f} B/sec),'
' {request-rate:.0f} avg requests/sec,'
' {request-latency-avg:.0f}ms avg latency,'
' {throttle-time-max:.0f}ms max throttle,'
' {record-size-avg:.0f} avg record size,'
' {batch-size-avg:.0f} avg batch size,'
' {records-per-request-avg:.0f} avg records/req'
.format(**metrics['producer-metrics']))

def print_final(self):
Expand Down
Loading