diff --git a/Makefile b/Makefile index acc1f175e..1d69efad8 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,9 @@ test: build-integration fixture: build-integration python -m test.integration.fixtures kafka +sasl: build-integration + python -m test.integration.fixtures kafka --sasl + test-coverage: build-integration pytest --cov=kafka --cov-report html test @echo "open file://`pwd`/htmlcov/index.html" diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index de24d5d8b..de4bdb40e 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -8,27 +8,14 @@ from .configs import ConfigsSubCommand from .consumer_groups import ConsumerGroupsSubCommand from .topics import TopicsSubCommand +from ..common import add_common_cli_args def main_parser(): parser = argparse.ArgumentParser( prog='python -m kafka.admin', description='Kafka admin client', ) - parser.add_argument( - '-b', '--bootstrap-servers', type=str, action='append', required=True, - help='host:port for cluster bootstrap servers') - parser.add_argument( - '-c', '--extra-config', type=str, action='append', - help='additional configuration properties for admin client') - parser.add_argument( - '-l', '--log-level', type=str, default='CRITICAL', - help='logging level, passed to logging.basicConfig') - parser.add_argument( - '-L', '--enable-logger', type=str, action='append', - help='enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled') - parser.add_argument( - '-DL', '--disable-logger', type=str, action='append', - help='disable a specific logger. Can be provided multiple times.') + add_common_cli_args(parser) parser.add_argument( '-f', '--format', type=str, default='raw', help='output format: raw|json') @@ -84,7 +71,13 @@ def run_cli(args=None): logger = logging.getLogger(__name__) kwargs = build_kwargs(config.extra_config) - client = KafkaAdminClient(bootstrap_servers=config.bootstrap_servers, **kwargs) + client = KafkaAdminClient( + bootstrap_servers=config.bootstrap_servers, + security_protocol=config.security_protocol, + sasl_mechanism=config.sasl_mechanism, + sasl_plain_username=config.sasl_user, + sasl_plain_password=config.sasl_password, + **kwargs) try: result = config.command(client, config) if config.format == 'raw': diff --git a/kafka/cli/common.py b/kafka/cli/common.py new file mode 100644 index 000000000..88fdf543e --- /dev/null +++ b/kafka/cli/common.py @@ -0,0 +1,27 @@ +def add_common_cli_args(parser): + connect_group = parser.add_argument_group('connection') + connect_group.add_argument( + '-b', '--bootstrap-servers', type=str, action='append', required=True, + help='host:port for cluster bootstrap server. Can be provided multiple times.') + connect_group.add_argument( + '-s', '--security-protocol', type=str, default='PLAINTEXT', help='PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL') + connect_group.add_argument( + '-m', '--sasl-mechanism', type=str, default='PLAIN', help='PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512') + connect_group.add_argument( + '-u', '--sasl-user', type=str) + connect_group.add_argument( + '-p', '--sasl-password', type=str) + logging_group = parser.add_argument_group('logging') + logging_group.add_argument( + '-l', '--log-level', type=str, default='CRITICAL', + help='logging level, passed to logging.basicConfig') + logging_group.add_argument( + '-L', '--enable-logger', type=str, action='append', + help='enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled') + logging_group.add_argument( + '-DL', '--disable-logger', type=str, action='append', + help='disable a specific logger. Can be provided multiple times.') + extended_group = parser.add_argument_group('extended') + extended_group.add_argument( + '-c', '--extra-config', type=str, action='append', + help='additional configuration properties for client in "key=val" format. Can be provided multiple times.') diff --git a/kafka/cli/consumer/__init__.py b/kafka/cli/consumer/__init__.py index fa979af40..2336cad0c 100644 --- a/kafka/cli/consumer/__init__.py +++ b/kafka/cli/consumer/__init__.py @@ -3,6 +3,7 @@ from kafka import KafkaConsumer from kafka.errors import KafkaError +from ..common import add_common_cli_args def main_parser(): @@ -10,27 +11,13 @@ def main_parser(): prog='python -m kafka.consumer', description='Kafka console consumer', ) - parser.add_argument( - '-b', '--bootstrap-servers', type=str, action='append', required=True, - help='host:port for cluster bootstrap server. Can be provided multiple times.') + add_common_cli_args(parser) parser.add_argument( '-t', '--topic', type=str, action='append', dest='topics', required=True, help='subscribe to topic') parser.add_argument( '-g', '--group', type=str, required=True, help='consumer group') - parser.add_argument( - '-c', '--extra-config', type=str, action='append', - help='additional configuration properties for kafka consumer') - parser.add_argument( - '-l', '--log-level', type=str, default='CRITICAL', - help='logging level, passed to logging.basicConfig') - parser.add_argument( - '-L', '--enable-logger', type=str, action='append', - help='enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled') - parser.add_argument( - '-DL', '--disable-logger', type=str, action='append', - help='disable a specific logger. Can be provided multiple times.') parser.add_argument( '-f', '--format', type=str, default='str', help='output format: str|raw|full') diff --git a/kafka/cli/producer/__init__.py b/kafka/cli/producer/__init__.py index 71c906ef7..cb2ea73da 100644 --- a/kafka/cli/producer/__init__.py +++ b/kafka/cli/producer/__init__.py @@ -3,6 +3,7 @@ import sys from kafka import KafkaProducer +from ..common import add_common_cli_args def main_parser(): @@ -10,24 +11,10 @@ def main_parser(): prog='python -m kafka.producer', description='Kafka console producer', ) - parser.add_argument( - '-b', '--bootstrap-servers', type=str, action='append', required=True, - help='host:port for cluster bootstrap servers') + add_common_cli_args(parser) parser.add_argument( '-t', '--topic', type=str, required=True, help='publish to topic') - parser.add_argument( - '-c', '--extra-config', type=str, action='append', - help='additional configuration properties for kafka producer') - parser.add_argument( - '-l', '--log-level', type=str, default='CRITICAL', - help='logging level, passed to logging.basicConfig') - parser.add_argument( - '-L', '--enable-logger', type=str, action='append', - help='enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled') - parser.add_argument( - '-DL', '--disable-logger', type=str, action='append', - help='disable a specific logger. Can be provided multiple times.') parser.add_argument( '--encoding', type=str, default='utf-8', help='byte encoding for produced messages') diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 3021596dd..03e37553a 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -54,7 +54,6 @@ def factory(**broker_params): broker = KafkaFixture.instance(node_id, **params) _brokers.append(broker) return broker - yield factory zks = set() @@ -66,7 +65,6 @@ def factory(**broker_params): zk.close() - @pytest.fixture def kafka_client(kafka_broker, request): """Return a KafkaClient fixture""" diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 3e44cd56c..30246f90a 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -727,9 +727,13 @@ def get_api_versions(): zk.close() -def run_brokers(): +def run_brokers(args=()): logging.basicConfig(level=logging.ERROR) - k = KafkaFixture.instance(0) + params = {} + if len(args) == 1 and args[0] == '--sasl': + params['transport'] = "SASL_PLAINTEXT" + params['sasl_mechanism'] = 'SCRAM-SHA-512' + k = KafkaFixture.instance(0, **params) zk = k.zookeeper print("Kafka", k.kafka_version, "running on port:", k.port) @@ -752,7 +756,7 @@ def run_brokers(): if cmd == 'get_api_versions': get_api_versions() elif cmd == 'kafka': - run_brokers() + run_brokers(sys.argv[2:]) else: print("Unknown cmd: %s", cmd) exit(1)