Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/protocol/admin/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class AlterPartitionReassignmentsResponse(ApiMessage): pass
class ListPartitionReassignmentsRequest(ApiMessage): pass
class ListPartitionReassignmentsResponse(ApiMessage): pass

class DescribeTopicPartitionsRequest(ApiMessage): pass
class DescribeTopicPartitionsResponse(ApiMessage): pass

class DeleteRecordsRequest(ApiMessage): pass
class DeleteRecordsResponse(ApiMessage): pass

Expand All @@ -48,6 +51,7 @@ class ElectionType(IntEnum):
'AlterPartitionRequest', 'AlterPartitionResponse',
'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse',
'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse',
'DescribeTopicPartitionsRequest', 'DescribeTopicPartitionsResponse',
'DeleteRecordsRequest', 'DeleteRecordsResponse',
'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType',
]
157 changes: 156 additions & 1 deletion kafka/protocol/admin/topics.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from enum import IntEnum
from kafka.protocol.api_message import ApiMessage
from kafka.protocol.data_container import DataContainer

__all__ = ['CreateTopicsRequest', 'CreateTopicsResponse', 'DeleteTopicsRequest', 'DeleteTopicsResponse', 'CreatePartitionsRequest', 'CreatePartitionsResponse', 'AlterPartitionRequest', 'AlterPartitionResponse', 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DeleteRecordsRequest', 'DeleteRecordsResponse', 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType']
__all__ = ['CreateTopicsRequest', 'CreateTopicsResponse', 'DeleteTopicsRequest', 'DeleteTopicsResponse', 'CreatePartitionsRequest', 'CreatePartitionsResponse', 'AlterPartitionRequest', 'AlterPartitionResponse', 'AlterPartitionReassignmentsRequest', 'AlterPartitionReassignmentsResponse', 'ListPartitionReassignmentsRequest', 'ListPartitionReassignmentsResponse', 'DescribeTopicPartitionsRequest', 'DescribeTopicPartitionsResponse', 'DeleteRecordsRequest', 'DeleteRecordsResponse', 'ElectLeadersRequest', 'ElectLeadersResponse', 'ElectionType']

class CreateTopicsRequest(ApiMessage):
class CreatableTopic(DataContainer):
Expand Down Expand Up @@ -746,6 +746,161 @@ class ListPartitionReassignmentsResponse(ApiMessage):
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class DescribeTopicPartitionsRequest(ApiMessage):
class TopicRequest(DataContainer):
name: str
def __init__(
self,
*args: Any,
name: str = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

class Cursor(DataContainer):
topic_name: str
partition_index: int
def __init__(
self,
*args: Any,
topic_name: str = ...,
partition_index: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

topics: list[TopicRequest]
response_partition_limit: int
cursor: Cursor | None
def __init__(
self,
*args: Any,
topics: list[TopicRequest] = ...,
response_partition_limit: int = ...,
cursor: Cursor | None = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class DescribeTopicPartitionsResponse(ApiMessage):
class DescribeTopicPartitionsResponseTopic(DataContainer):
class DescribeTopicPartitionsResponsePartition(DataContainer):
error_code: int
partition_index: int
leader_id: int
leader_epoch: int
replica_nodes: list[int]
isr_nodes: list[int]
eligible_leader_replicas: list[int] | None
last_known_elr: list[int] | None
offline_replicas: list[int]
def __init__(
self,
*args: Any,
error_code: int = ...,
partition_index: int = ...,
leader_id: int = ...,
leader_epoch: int = ...,
replica_nodes: list[int] = ...,
isr_nodes: list[int] = ...,
eligible_leader_replicas: list[int] | None = ...,
last_known_elr: list[int] | None = ...,
offline_replicas: list[int] = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

error_code: int
name: str | None
topic_id: uuid.UUID
is_internal: bool
partitions: list[DescribeTopicPartitionsResponsePartition]
topic_authorized_operations: int
def __init__(
self,
*args: Any,
error_code: int = ...,
name: str | None = ...,
topic_id: uuid.UUID = ...,
is_internal: bool = ...,
partitions: list[DescribeTopicPartitionsResponsePartition] = ...,
topic_authorized_operations: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

class Cursor(DataContainer):
topic_name: str
partition_index: int
def __init__(
self,
*args: Any,
topic_name: str = ...,
partition_index: int = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...

throttle_time_ms: int
topics: list[DescribeTopicPartitionsResponseTopic]
next_cursor: Cursor | None
def __init__(
self,
*args: Any,
throttle_time_ms: int = ...,
topics: list[DescribeTopicPartitionsResponseTopic] = ...,
next_cursor: Cursor | None = ...,
version: int | None = None,
**kwargs: Any,
) -> None: ...
@property
def version(self) -> int | None: ...
def to_dict(self, meta: bool = False, json: bool = True) -> dict: ...
name: str
type: str
API_KEY: int
API_VERSION: int
valid_versions: tuple[int, int]
min_version: int
max_version: int
@property
def header(self) -> Any: ...
@classmethod
def is_request(cls) -> bool: ...
def expect_response(self) -> bool: ...
def with_header(self, correlation_id: int = 0, client_id: str = "kafka-python") -> None: ...

class DeleteRecordsRequest(ApiMessage):
class DeleteRecordsTopic(DataContainer):
class DeleteRecordsPartition(DataContainer):
Expand Down
6 changes: 4 additions & 2 deletions kafka/protocol/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ def _to_dict_vals(self, meta=False, json=True):
if self._version is not None and not field.for_version_q(self._version):
continue
if field.is_struct():
yield (field.name, dict(getattr(self, field.name)._to_dict_vals(meta=meta, json=json)))
val = getattr(self, field.name)
yield (field.name, None if val is None else dict(val._to_dict_vals(meta=meta, json=json)))
elif field.is_struct_array():
yield (field.name, [dict(val._to_dict_vals(meta=meta, json=json)) for val in getattr(self, field.name)])
val = getattr(self, field.name)
yield (field.name, None if val is None else [dict(v._to_dict_vals(meta=meta, json=json)) for v in val])
else:
val = getattr(self, field.name)
yield (field.name, field.to_json(val) if json else val)
Expand Down
5 changes: 5 additions & 0 deletions kafka/protocol/schemas/fields/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def max_version(self):
def for_version_q(self, version):
return self._versions[0] <= version <= self._versions[1]

def nullable_for_version_q(self, version):
if self._nullable_versions is None:
return False
return self._nullable_versions[0] <= version <= self._nullable_versions[1]

def tagged_field_q(self, version):
if self._tag is None or self._tagged_versions is None:
return False
Expand Down
51 changes: 50 additions & 1 deletion kafka/protocol/schemas/fields/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ def untagged_fields(self, version):
return self._untagged_fields_cache[version]

def encode(self, item, version=None, compact=False, tagged=False):
# Nested nullable struct: 1-byte prefix (0 = null, 1 = present).
# Top-level message structs never have nullableVersions set, so this
# check is safe without a top-level guard.
if self.nullable_for_version_q(version):
if item is None:
return b'\x00'
prefix = b'\x01'
else:
prefix = b''
fields = self.untagged_fields(version)
if isinstance(item, tuple):
getter = lambda item, i, field: item[i]
Expand All @@ -94,10 +103,26 @@ def encode(self, item, version=None, compact=False, tagged=False):
encoded.append(self.tagged_fields(version).encode(tags, version=version))
elif tagged is None:
encoded.append(TaggedFields.encode_empty())
return b''.join(encoded)
return prefix + b''.join(encoded)

def emit_encode_into(self, ctx, item_expr, indent, version=None, compact=False,
tagged=False, tuple_access=False):
# Top-level struct (item_expr == 'item') has its nullability handled
# by the parent struct; only inline null-prefix when this is a nested
# nullable struct field.
inline_nullable = (
self.nullable_for_version_q(version)
and item_expr != 'item'
and not tuple_access
)
if inline_nullable:
ctx.emit(indent, 'if %s is None:' % item_expr)
ctx.emit(indent, ' buf[pos] = 0')
ctx.emit(indent, ' pos += 1')
ctx.emit(indent, 'else:')
ctx.emit(indent, ' buf[pos] = 1')
ctx.emit(indent, ' pos += 1')
indent = indent + ' '
fields = self.untagged_fields(version)
for i, field in enumerate(fields):
if tuple_access:
Expand All @@ -117,6 +142,14 @@ def emit_encode_into(self, ctx, item_expr, indent, version=None, compact=False,
ctx.emit(indent, 'pos += 1')

def encode_into(self, item, out, version=None, compact=False, tagged=False):
if self.nullable_for_version_q(version):
out.ensure(1)
if item is None:
out.buf[out.pos] = 0
out.pos += 1
return
out.buf[out.pos] = 1
out.pos += 1
fields = self.untagged_fields(version)
if isinstance(item, tuple):
for i, field in enumerate(fields):
Expand Down Expand Up @@ -162,6 +195,19 @@ def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, t

Batches adjacent batchable fields into single unpack_from calls.
"""
# Top-level struct decode (var_name == 'obj') has no outer null-prefix;
# only a nested nullable struct field consumes one.
inline_nullable = (
self.nullable_for_version_q(version)
and var_name != 'obj'
)
if inline_nullable:
ctx.emit(indent, 'if data[pos] == 0:')
ctx.emit(indent, ' pos += 1')
ctx.emit(indent, ' %s = None' % var_name)
ctx.emit(indent, 'else:')
ctx.emit(indent, ' pos += 1')
indent = indent + ' '
fields = self.untagged_fields(version)
data_class = self.data_class

Expand Down Expand Up @@ -279,6 +325,9 @@ def compiled_decode_from(self, version, compact=False, tagged=False, data_class=
return self._compiled_encoders[key]

def decode(self, data, version=None, compact=False, tagged=False, data_class=None):
if self.nullable_for_version_q(version):
if data.read(1) == b'\x00':
return None
if data_class is None:
data_class = self.data_class
decoded = {
Expand Down
4 changes: 4 additions & 0 deletions kafka/protocol/schemas/fields/struct_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def __init__(self, json, array_of=None):
array_of = self.parse_inner_type(json)
assert array_of is not None, 'json does not contain a StructArray!'
super().__init__(json, array_of=array_of)
# nullableVersions on the JSON describes the array's nullability, not
# the inner struct's. Clear it on the inner struct so StructField does
# not try to emit a per-element null-prefix when encoding/decoding.
array_of._nullable_versions = None
# map_key will be (idx, field) of the mapKey field if found
self.map_key = next(filter(lambda x: x[1]._json.get('mapKey'), enumerate(self._fields)), None)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 75,
"type": "request",
"listeners": ["broker"],
"name": "DescribeTopicPartitionsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
"about": "The topics to fetch details for.",
"fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." }
]
},
{ "name": "ResponsePartitionLimit", "type": "int32", "versions": "0+", "default": "2000",
"about": "The maximum number of partitions included in the response." },
{ "name": "Cursor", "type": "Cursor", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The first topic and partition index to fetch details for.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The name for the first topic to process." },
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index to start with." }
]}
]
}
Loading
Loading