Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def __init__(
permission_type,
resource_pattern
):
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
super().__init__(principal, host, operation, permission_type, resource_pattern)
self.validate()

def validate(self):
Expand Down Expand Up @@ -231,7 +231,7 @@ def __init__(
resource_name,
pattern_type=ACLResourcePatternType.LITERAL
):
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
super().__init__(resource_type, resource_name, pattern_type)
self.validate()

def validate(self):
Expand Down
2 changes: 1 addition & 1 deletion kafka/benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def run(args):

class StatsReporter(threading.Thread):
def __init__(self, interval, consumer, event=None, raw_metrics=False):
super(StatsReporter, self).__init__()
super().__init__()
self.interval = interval
self.consumer = consumer
self.event = event
Expand Down
4 changes: 2 additions & 2 deletions kafka/benchmarks/load_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class Producer(threading.Thread):

def __init__(self, bootstrap_servers, topic, stop_event, msg_size):
super(Producer, self).__init__()
super().__init__()
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.stop_event = stop_event
Expand All @@ -30,7 +30,7 @@ def run(self):

class Consumer(threading.Thread):
def __init__(self, bootstrap_servers, topic, stop_event, msg_size):
super(Consumer, self).__init__()
super().__init__()
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.stop_event = stop_event
Expand Down
2 changes: 1 addition & 1 deletion kafka/benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _benchmark():

class StatsReporter(threading.Thread):
def __init__(self, interval, producer, event=None, raw_metrics=False):
super(StatsReporter, self).__init__()
super().__init__()
self.interval = interval
self.producer = producer
self.event = event
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ def __init__(self, heartbeat, metrics, prefix, tags=None):

class HeartbeatThread(threading.Thread):
def __init__(self, coordinator):
super(HeartbeatThread, self).__init__()
super().__init__()
self.name = coordinator.group_id + '-heartbeat'
self.coordinator = coordinator
self.enabled = False
Expand Down
8 changes: 4 additions & 4 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(self, client, subscription, **configs):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
super().__init__(client, **configs)

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -149,7 +149,7 @@ def __del__(self):
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
except TypeError:
pass
super(ConsumerCoordinator, self).__del__()
super().__del__()

def protocol_type(self):
return ConsumerProtocolType
Expand Down Expand Up @@ -403,7 +403,7 @@ def need_rejoin(self):
and self._joined_subscription != self._subscription.subscription):
return True

return super(ConsumerCoordinator, self).need_rejoin()
return super().need_rejoin()

def refresh_committed_offsets_if_needed(self, timeout_ms=None):
"""Fetch committed offsets for assigned partitions."""
Expand Down Expand Up @@ -477,7 +477,7 @@ def close(self, autocommit=True, timeout_ms=None):
if autocommit:
self._maybe_auto_commit_offsets_sync(timeout_ms=timeout_ms)
finally:
super(ConsumerCoordinator, self).close(timeout_ms=timeout_ms)
super().close(timeout_ms=timeout_ms)

def _invoke_completed_offset_commit_callbacks(self):
if self._async_commit_fenced:
Expand Down
6 changes: 3 additions & 3 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __str__(self):
if not self.args:
return self.__class__.__name__
return '{0}: {1}'.format(self.__class__.__name__,
super(KafkaError, self).__str__())
super().__str__())

def __eq__(self, other):
return self.__class__ == other.__class__ and self.args == other.args
Expand All @@ -26,7 +26,7 @@ def __init__(self, *args):
if not args:
args = ("Commit cannot be completed since the group has already"
" rebalanced and assigned the partitions to another member.",)
super(CommitFailedError, self).__init__(*args)
super().__init__(*args)


class IllegalArgumentError(KafkaError):
Expand Down Expand Up @@ -113,7 +113,7 @@ def __str__(self):
"""Add errno to standard KafkaError str"""
return '[Error {0}] {1}'.format(
self.errno,
super(BrokerResponseError, self).__str__())
super().__str__())


class AuthorizationError(BrokerResponseError):
Expand Down
8 changes: 4 additions & 4 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

class FutureProduceResult(Future):
def __init__(self, topic_partition):
super(FutureProduceResult, self).__init__()
super().__init__()
self.topic_partition = topic_partition
self._latch = threading.Event()

def success(self, value):
ret = super(FutureProduceResult, self).success(value)
ret = super().success(value)
self._latch.set()
return ret

def failure(self, error):
ret = super(FutureProduceResult, self).failure(error)
ret = super().failure(error)
self._latch.set()
return ret

Expand All @@ -28,7 +28,7 @@ def wait(self, timeout=None):

class FutureRecordMetadata(Future):
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super(FutureRecordMetadata, self).__init__()
super().__init__()
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Sender(threading.Thread):
}

def __init__(self, client, metadata, accumulator, **configs):
super(Sender, self).__init__()
super().__init__()
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
Expand Down
14 changes: 7 additions & 7 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def exception(self):
return self._error


class TxnRequestHandler(object, metaclass=abc.ABCMeta):
class TxnRequestHandler(metaclass=abc.ABCMeta):
def __init__(self, transaction_manager, result=None):
self.transaction_manager = transaction_manager
self.retry_backoff_ms = transaction_manager.retry_backoff_ms
Expand Down Expand Up @@ -612,7 +612,7 @@ def priority(self):

class InitProducerIdHandler(TxnRequestHandler):
def __init__(self, transaction_manager, transaction_timeout_ms):
super(InitProducerIdHandler, self).__init__(transaction_manager)
super().__init__(transaction_manager)

if transaction_manager._api_version >= (2, 0):
version = 1
Expand Down Expand Up @@ -645,7 +645,7 @@ def handle_response(self, response):

class AddPartitionsToTxnHandler(TxnRequestHandler):
def __init__(self, transaction_manager, topic_partitions):
super(AddPartitionsToTxnHandler, self).__init__(transaction_manager)
super().__init__(transaction_manager)

if transaction_manager._api_version >= (2, 7):
version = 2
Expand Down Expand Up @@ -740,7 +740,7 @@ def maybe_override_retry_backoff_ms(self):

class FindCoordinatorHandler(TxnRequestHandler):
def __init__(self, transaction_manager, coord_type, coord_key):
super(FindCoordinatorHandler, self).__init__(transaction_manager)
super().__init__(transaction_manager)

self._coord_type = coord_type
self._coord_key = coord_key
Expand Down Expand Up @@ -796,7 +796,7 @@ def handle_response(self, response):

class EndTxnHandler(TxnRequestHandler):
def __init__(self, transaction_manager, committed):
super(EndTxnHandler, self).__init__(transaction_manager)
super().__init__(transaction_manager)

if self.transaction_manager._api_version >= (2, 7):
version = 2
Expand Down Expand Up @@ -837,7 +837,7 @@ def handle_response(self, response):

class AddOffsetsToTxnHandler(TxnRequestHandler):
def __init__(self, transaction_manager, consumer_group_id, offsets):
super(AddOffsetsToTxnHandler, self).__init__(transaction_manager)
super().__init__(transaction_manager)

self.consumer_group_id = consumer_group_id
self.offsets = offsets
Expand Down Expand Up @@ -887,7 +887,7 @@ def handle_response(self, response):

class TxnOffsetCommitHandler(TxnRequestHandler):
def __init__(self, transaction_manager, consumer_group_id, offsets, result):
super(TxnOffsetCommitHandler, self).__init__(transaction_manager, result=result)
super().__init__(transaction_manager, result=result)

self.consumer_group_id = consumer_group_id
self.offsets = offsets
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/frame.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
class KafkaBytes(bytearray):
def __init__(self, size):
super(KafkaBytes, self).__init__(size)
super().__init__(size)
self._idx = 0

def read(self, nbytes=None):
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/old/abstract.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc


class AbstractType(object, metaclass=abc.ABCMeta):
class AbstractType(metaclass=abc.ABCMeta):
@classmethod
@abc.abstractmethod
def encode(cls, value): # pylint: disable=no-self-argument
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/old/api_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def decode(cls, data, header=False, framed=False):
data.seek(curr)
if err != 0:
return ApiVersionsResponse_v0.decode(data, header=header, framed=framed)
return super(BaseApiVersionsResponse, cls).decode(data, header=header, framed=framed)
return super().decode(data, header=header, framed=framed)


class ApiVersionsResponse_v0(Response):
Expand Down
8 changes: 4 additions & 4 deletions kafka/record/abc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc


class ABCRecord(object, metaclass=abc.ABCMeta):
class ABCRecord(metaclass=abc.ABCMeta):
__slots__ = ()

@abc.abstractproperty
Expand Down Expand Up @@ -52,7 +52,7 @@ def headers(self):
"""


class ABCRecordBatchBuilder(object, metaclass=abc.ABCMeta):
class ABCRecordBatchBuilder(metaclass=abc.ABCMeta):
__slots__ = ()

@abc.abstractmethod
Expand Down Expand Up @@ -91,7 +91,7 @@ def build(self):
"""


class ABCRecordBatch(object, metaclass=abc.ABCMeta):
class ABCRecordBatch(metaclass=abc.ABCMeta):
""" For v2 encapsulates a RecordBatch, for v0/v1 a single (maybe
compressed) message.
"""
Expand Down Expand Up @@ -119,7 +119,7 @@ def magic(self):
"""


class ABCRecords(object, metaclass=abc.ABCMeta):
class ABCRecords(metaclass=abc.ABCMeta):
__slots__ = ()

@abc.abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ class ControlRecord(DefaultRecord):
)

def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers):
super(ControlRecord, self).__init__(size_in_bytes, offset, timestamp, timestamp_type, key, value, headers)
super().__init__(size_in_bytes, offset, timestamp, timestamp_type, key, value, headers)
(self._version, self._type) = self.KEY_STRUCT.unpack(self._key)

# see https://kafka.apache.org/documentation/#controlbatch
Expand Down
2 changes: 1 addition & 1 deletion kafka/sasl/abc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc


class SaslMechanism(object, metaclass=abc.ABCMeta):
class SaslMechanism(metaclass=abc.ABCMeta):
@abc.abstractmethod
def __init__(self, **config):
pass
Expand Down
10 changes: 5 additions & 5 deletions test/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ def instance(cls, host=None, port=None, external=False):
return fixture

def __init__(self, host, port, external=False, tmp_dir=None):
super(ZookeeperFixture, self).__init__()
super().__init__()
self.host = host
self.port = port
self.running = external
self.tmp_dir = tmp_dir

def kafka_run_class_env(self):
env = super(ZookeeperFixture, self).kafka_run_class_env()
env = super().kafka_run_class_env()
env['LOG_DIR'] = self.tmp_dir.join('logs').strpath
return env

Expand Down Expand Up @@ -304,7 +304,7 @@ def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None,
replicas=1, partitions=2, transport='PLAINTEXT',
sasl_mechanism=None, auto_create_topic=True,
tmp_dir=None, external=False):
super(KafkaFixture, self).__init__()
super().__init__()
self.external = external
self.running = False
self.host = host
Expand Down Expand Up @@ -465,7 +465,7 @@ def bootstrap_server(self):
return '%s:%d' % (self.host, self.port)

def kafka_run_class_env(self):
env = super(KafkaFixture, self).kafka_run_class_env()
env = super().kafka_run_class_env()
env['LOG_DIR'] = self.tmp_dir.join('logs').strpath
return env

Expand Down Expand Up @@ -624,7 +624,7 @@ def close(self):
self.out("Done!")

def dump_logs(self):
super(KafkaFixture, self).dump_logs()
super().dump_logs()
self.zookeeper.dump_logs()

def _format_log_dirs(self):
Expand Down
2 changes: 1 addition & 1 deletion test/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def wait_for(self, pattern, timeout=30):

class SpawnedService(threading.Thread):
def __init__(self, args=None, env=None):
super(SpawnedService, self).__init__()
super().__init__()

if args is None:
raise TypeError("args parameter is required")
Expand Down
Loading