diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index 9d104b6a9..7cd505055 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -160,7 +160,7 @@ def __init__( permission_type, resource_pattern ): - super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern) + super().__init__(principal, host, operation, permission_type, resource_pattern) self.validate() def validate(self): @@ -231,7 +231,7 @@ def __init__( resource_name, pattern_type=ACLResourcePatternType.LITERAL ): - super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type) + super().__init__(resource_type, resource_name, pattern_type) self.validate() def validate(self): diff --git a/kafka/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py index c570703d8..5ee265cca 100644 --- a/kafka/benchmarks/consumer_performance.py +++ b/kafka/benchmarks/consumer_performance.py @@ -71,7 +71,7 @@ def run(args): class StatsReporter(threading.Thread): def __init__(self, interval, consumer, event=None, raw_metrics=False): - super(StatsReporter, self).__init__() + super().__init__() self.interval = interval self.consumer = consumer self.event = event diff --git a/kafka/benchmarks/load_example.py b/kafka/benchmarks/load_example.py index 6f24e9b37..02bf80adc 100644 --- a/kafka/benchmarks/load_example.py +++ b/kafka/benchmarks/load_example.py @@ -11,7 +11,7 @@ class Producer(threading.Thread): def __init__(self, bootstrap_servers, topic, stop_event, msg_size): - super(Producer, self).__init__() + super().__init__() self.bootstrap_servers = bootstrap_servers self.topic = topic self.stop_event = stop_event @@ -30,7 +30,7 @@ def run(self): class Consumer(threading.Thread): def __init__(self, bootstrap_servers, topic, stop_event, msg_size): - super(Consumer, self).__init__() + super().__init__() self.bootstrap_servers = bootstrap_servers self.topic = topic self.stop_event = stop_event diff --git a/kafka/benchmarks/producer_performance.py b/kafka/benchmarks/producer_performance.py index 87993082b..f44f89cd0 100644 --- a/kafka/benchmarks/producer_performance.py +++ b/kafka/benchmarks/producer_performance.py @@ -82,7 +82,7 @@ def _benchmark(): class StatsReporter(threading.Thread): def __init__(self, interval, producer, event=None, raw_metrics=False): - super(StatsReporter, self).__init__() + super().__init__() self.interval = interval self.producer = producer self.event = event diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 8b1d1e0eb..185549b5b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1089,7 +1089,7 @@ def __init__(self, heartbeat, metrics, prefix, tags=None): class HeartbeatThread(threading.Thread): def __init__(self, coordinator): - super(HeartbeatThread, self).__init__() + super().__init__() self.name = coordinator.group_id + '-heartbeat' self.coordinator = coordinator self.enabled = False diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index afb207000..da2ed2b97 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -86,7 +86,7 @@ def __init__(self, client, subscription, **configs): True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True """ - super(ConsumerCoordinator, self).__init__(client, **configs) + super().__init__(client, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -149,7 +149,7 @@ def __del__(self): self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) except TypeError: pass - super(ConsumerCoordinator, self).__del__() + super().__del__() def protocol_type(self): return ConsumerProtocolType @@ -403,7 +403,7 @@ def need_rejoin(self): and self._joined_subscription != self._subscription.subscription): return True - return super(ConsumerCoordinator, self).need_rejoin() + return super().need_rejoin() def refresh_committed_offsets_if_needed(self, timeout_ms=None): """Fetch committed offsets for assigned partitions.""" @@ -477,7 +477,7 @@ def close(self, autocommit=True, timeout_ms=None): if autocommit: self._maybe_auto_commit_offsets_sync(timeout_ms=timeout_ms) finally: - super(ConsumerCoordinator, self).close(timeout_ms=timeout_ms) + super().close(timeout_ms=timeout_ms) def _invoke_completed_offset_commit_callbacks(self): if self._async_commit_fenced: diff --git a/kafka/errors.py b/kafka/errors.py index 96fdcb223..09691d536 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -11,7 +11,7 @@ def __str__(self): if not self.args: return self.__class__.__name__ return '{0}: {1}'.format(self.__class__.__name__, - super(KafkaError, self).__str__()) + super().__str__()) def __eq__(self, other): return self.__class__ == other.__class__ and self.args == other.args @@ -26,7 +26,7 @@ def __init__(self, *args): if not args: args = ("Commit cannot be completed since the group has already" " rebalanced and assigned the partitions to another member.",) - super(CommitFailedError, self).__init__(*args) + super().__init__(*args) class IllegalArgumentError(KafkaError): @@ -113,7 +113,7 @@ def __str__(self): """Add errno to standard KafkaError str""" return '[Error {0}] {1}'.format( self.errno, - super(BrokerResponseError, self).__str__()) + super().__str__()) class AuthorizationError(BrokerResponseError): diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 50fabbb3c..a5096ac48 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -7,17 +7,17 @@ class FutureProduceResult(Future): def __init__(self, topic_partition): - super(FutureProduceResult, self).__init__() + super().__init__() self.topic_partition = topic_partition self._latch = threading.Event() def success(self, value): - ret = super(FutureProduceResult, self).success(value) + ret = super().success(value) self._latch.set() return ret def failure(self, error): - ret = super(FutureProduceResult, self).failure(error) + ret = super().failure(error) self._latch.set() return ret @@ -28,7 +28,7 @@ def wait(self, timeout=None): class FutureRecordMetadata(Future): def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): - super(FutureRecordMetadata, self).__init__() + super().__init__() self._produce_future = produce_future # packing args as a tuple is a minor speed optimization self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 067010a12..6ec27f71d 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -42,7 +42,7 @@ class Sender(threading.Thread): } def __init__(self, client, metadata, accumulator, **configs): - super(Sender, self).__init__() + super().__init__() self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 46a447ece..6661a4519 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -524,7 +524,7 @@ def exception(self): return self._error -class TxnRequestHandler(object, metaclass=abc.ABCMeta): +class TxnRequestHandler(metaclass=abc.ABCMeta): def __init__(self, transaction_manager, result=None): self.transaction_manager = transaction_manager self.retry_backoff_ms = transaction_manager.retry_backoff_ms @@ -612,7 +612,7 @@ def priority(self): class InitProducerIdHandler(TxnRequestHandler): def __init__(self, transaction_manager, transaction_timeout_ms): - super(InitProducerIdHandler, self).__init__(transaction_manager) + super().__init__(transaction_manager) if transaction_manager._api_version >= (2, 0): version = 1 @@ -645,7 +645,7 @@ def handle_response(self, response): class AddPartitionsToTxnHandler(TxnRequestHandler): def __init__(self, transaction_manager, topic_partitions): - super(AddPartitionsToTxnHandler, self).__init__(transaction_manager) + super().__init__(transaction_manager) if transaction_manager._api_version >= (2, 7): version = 2 @@ -740,7 +740,7 @@ def maybe_override_retry_backoff_ms(self): class FindCoordinatorHandler(TxnRequestHandler): def __init__(self, transaction_manager, coord_type, coord_key): - super(FindCoordinatorHandler, self).__init__(transaction_manager) + super().__init__(transaction_manager) self._coord_type = coord_type self._coord_key = coord_key @@ -796,7 +796,7 @@ def handle_response(self, response): class EndTxnHandler(TxnRequestHandler): def __init__(self, transaction_manager, committed): - super(EndTxnHandler, self).__init__(transaction_manager) + super().__init__(transaction_manager) if self.transaction_manager._api_version >= (2, 7): version = 2 @@ -837,7 +837,7 @@ def handle_response(self, response): class AddOffsetsToTxnHandler(TxnRequestHandler): def __init__(self, transaction_manager, consumer_group_id, offsets): - super(AddOffsetsToTxnHandler, self).__init__(transaction_manager) + super().__init__(transaction_manager) self.consumer_group_id = consumer_group_id self.offsets = offsets @@ -887,7 +887,7 @@ def handle_response(self, response): class TxnOffsetCommitHandler(TxnRequestHandler): def __init__(self, transaction_manager, consumer_group_id, offsets, result): - super(TxnOffsetCommitHandler, self).__init__(transaction_manager, result=result) + super().__init__(transaction_manager, result=result) self.consumer_group_id = consumer_group_id self.offsets = offsets diff --git a/kafka/protocol/frame.py b/kafka/protocol/frame.py index 7b4a32bcf..10ebf7c9b 100644 --- a/kafka/protocol/frame.py +++ b/kafka/protocol/frame.py @@ -1,6 +1,6 @@ class KafkaBytes(bytearray): def __init__(self, size): - super(KafkaBytes, self).__init__(size) + super().__init__(size) self._idx = 0 def read(self, nbytes=None): diff --git a/kafka/protocol/old/abstract.py b/kafka/protocol/old/abstract.py index 5817673cb..529a73d35 100644 --- a/kafka/protocol/old/abstract.py +++ b/kafka/protocol/old/abstract.py @@ -1,7 +1,7 @@ import abc -class AbstractType(object, metaclass=abc.ABCMeta): +class AbstractType(metaclass=abc.ABCMeta): @classmethod @abc.abstractmethod def encode(cls, value): # pylint: disable=no-self-argument diff --git a/kafka/protocol/old/api_versions.py b/kafka/protocol/old/api_versions.py index a70851842..ec780537d 100644 --- a/kafka/protocol/old/api_versions.py +++ b/kafka/protocol/old/api_versions.py @@ -30,7 +30,7 @@ def decode(cls, data, header=False, framed=False): data.seek(curr) if err != 0: return ApiVersionsResponse_v0.decode(data, header=header, framed=framed) - return super(BaseApiVersionsResponse, cls).decode(data, header=header, framed=framed) + return super().decode(data, header=header, framed=framed) class ApiVersionsResponse_v0(Response): diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 908ad30b7..21da8e928 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -1,7 +1,7 @@ import abc -class ABCRecord(object, metaclass=abc.ABCMeta): +class ABCRecord(metaclass=abc.ABCMeta): __slots__ = () @abc.abstractproperty @@ -52,7 +52,7 @@ def headers(self): """ -class ABCRecordBatchBuilder(object, metaclass=abc.ABCMeta): +class ABCRecordBatchBuilder(metaclass=abc.ABCMeta): __slots__ = () @abc.abstractmethod @@ -91,7 +91,7 @@ def build(self): """ -class ABCRecordBatch(object, metaclass=abc.ABCMeta): +class ABCRecordBatch(metaclass=abc.ABCMeta): """ For v2 encapsulates a RecordBatch, for v0/v1 a single (maybe compressed) message. """ @@ -119,7 +119,7 @@ def magic(self): """ -class ABCRecords(object, metaclass=abc.ABCMeta): +class ABCRecords(metaclass=abc.ABCMeta): __slots__ = () @abc.abstractmethod diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 496cd8bfe..2df422757 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -450,7 +450,7 @@ class ControlRecord(DefaultRecord): ) def __init__(self, size_in_bytes, offset, timestamp, timestamp_type, key, value, headers): - super(ControlRecord, self).__init__(size_in_bytes, offset, timestamp, timestamp_type, key, value, headers) + super().__init__(size_in_bytes, offset, timestamp, timestamp_type, key, value, headers) (self._version, self._type) = self.KEY_STRUCT.unpack(self._key) # see https://kafka.apache.org/documentation/#controlbatch diff --git a/kafka/sasl/abc.py b/kafka/sasl/abc.py index 30a58d888..7d47bcfcc 100644 --- a/kafka/sasl/abc.py +++ b/kafka/sasl/abc.py @@ -1,7 +1,7 @@ import abc -class SaslMechanism(object, metaclass=abc.ABCMeta): +class SaslMechanism(metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, **config): pass diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index d2b0131d6..ed28a955e 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -190,14 +190,14 @@ def instance(cls, host=None, port=None, external=False): return fixture def __init__(self, host, port, external=False, tmp_dir=None): - super(ZookeeperFixture, self).__init__() + super().__init__() self.host = host self.port = port self.running = external self.tmp_dir = tmp_dir def kafka_run_class_env(self): - env = super(ZookeeperFixture, self).kafka_run_class_env() + env = super().kafka_run_class_env() env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env @@ -304,7 +304,7 @@ def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None, replicas=1, partitions=2, transport='PLAINTEXT', sasl_mechanism=None, auto_create_topic=True, tmp_dir=None, external=False): - super(KafkaFixture, self).__init__() + super().__init__() self.external = external self.running = False self.host = host @@ -465,7 +465,7 @@ def bootstrap_server(self): return '%s:%d' % (self.host, self.port) def kafka_run_class_env(self): - env = super(KafkaFixture, self).kafka_run_class_env() + env = super().kafka_run_class_env() env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env @@ -624,7 +624,7 @@ def close(self): self.out("Done!") def dump_logs(self): - super(KafkaFixture, self).dump_logs() + super().dump_logs() self.zookeeper.dump_logs() def _format_log_dirs(self): diff --git a/test/service.py b/test/service.py index 2113de209..c751e4c7f 100644 --- a/test/service.py +++ b/test/service.py @@ -35,7 +35,7 @@ def wait_for(self, pattern, timeout=30): class SpawnedService(threading.Thread): def __init__(self, args=None, env=None): - super(SpawnedService, self).__init__() + super().__init__() if args is None: raise TypeError("args parameter is required")