Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/apidoc/BrokerConnection.rst

This file was deleted.

2 changes: 0 additions & 2 deletions docs/apidoc/KafkaClient.rst

This file was deleted.

2 changes: 0 additions & 2 deletions docs/apidoc/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ kafka-python API
KafkaConsumer
KafkaProducer
KafkaAdminClient
KafkaClient
BrokerConnection
ClusterMetadata
OffsetAndMetadata
TopicPartition
7 changes: 4 additions & 3 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ class KafkaAdminClient(
providing a file, only the leaf certificate will be checked against
this CRL. Default: None.
api_version (tuple): Specify which Kafka API version to use. If set
to None, KafkaClient will attempt to infer the broker version by
probing various APIs. Example: (0, 10, 2). Default: None
to None, KafkaConnectionManager will attempt to infer the
broker version by probing various APIs. Example: (0, 10, 2).
Default: None
bootstrap_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when bootstrapping.
Default: 2000.
Expand All @@ -149,7 +150,7 @@ class KafkaAdminClient(
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
token provider instance. Default: None
socks5_proxy (str): Socks5 proxy url. Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances
kafka_client (callable): Custom class / callable for creating KafkaNetClient instances
"""
DEFAULT_CONFIG = {
# client configs
Expand Down
1,155 changes: 0 additions & 1,155 deletions kafka/client_async.py

This file was deleted.

72 changes: 71 additions & 1 deletion kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import logging
import random
import re
import socket
import threading
import time

from kafka import errors as Errors
from kafka.conn import get_ip_port_afi
from kafka.future import Future
from kafka.protocol.metadata import MetadataRequest, MetadataResponse
from kafka.structs import TopicPartition
Expand Down Expand Up @@ -484,3 +484,73 @@ def collect_hosts(hosts, randomize=True):
if randomize:
random.shuffle(result)
return result


def _address_family(address):
"""
Attempt to determine the family of an address (or hostname)

:return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family
could not be determined
"""
if address.startswith('[') and address.endswith(']'):
return socket.AF_INET6
for af in (socket.AF_INET, socket.AF_INET6):
try:
socket.inet_pton(af, address)
return af
except (ValueError, AttributeError, socket.error):
continue
return socket.AF_UNSPEC


DEFAULT_KAFKA_PORT = 9092


def get_ip_port_afi(host_and_port_str):
"""
Parse the IP and port from a string in the format of:

* host_or_ip <- Can be either IPv4 address literal or hostname/fqdn
* host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn
* [host_or_ip] <- IPv6 address literal
* [host_or_ip]:port. <- IPv6 address literal

.. note:: IPv6 address literals with ports *must* be enclosed in brackets

.. note:: If the port is not specified, default will be returned.

:return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC
"""
host_and_port_str = host_and_port_str.strip()
if host_and_port_str.startswith('['):
af = socket.AF_INET6
host, rest = host_and_port_str[1:].split(']')
if rest:
port = int(rest[1:])
else:
port = DEFAULT_KAFKA_PORT
return host, port, af
else:
if ':' not in host_and_port_str:
af = _address_family(host_and_port_str)
return host_and_port_str, DEFAULT_KAFKA_PORT, af
else:
# now we have something with a colon in it and no square brackets. It could be
# either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair
try:
# if it decodes as an IPv6 address, use that
socket.inet_pton(socket.AF_INET6, host_and_port_str)
return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6
except AttributeError:
log.warning('socket.inet_pton not available on this platform.'
' consider `pip install win_inet_pton`')
pass
except (ValueError, socket.error):
# it's a host:port pair
pass
host, port = host_and_port_str.rsplit(':', 1)
port = int(port)

af = _address_family(host)
return host, port, af
Loading
Loading