diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 249a29f0a..eff561a6c 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -91,6 +91,8 @@ def run_cli(args=None): if config.format == 'raw': pprint(result) elif config.format == 'json': + if hasattr(result, 'to_dict'): + result = result.to_dict() print(json.dumps(result)) return 0 except Exception: diff --git a/kafka/cli/admin/topics/delete.py b/kafka/cli/admin/topics/delete.py index 4193b65c0..ff2d93450 100644 --- a/kafka/cli/admin/topics/delete.py +++ b/kafka/cli/admin/topics/delete.py @@ -6,8 +6,8 @@ class DeleteTopic: @classmethod def add_subparser(cls, subparsers): parser = subparsers.add_parser('delete', help='Delete Kafka Topic') - parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='topic name') - parser.add_argument('--id', type=str, action='append', dest='topic_ids', help='topic UUID') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[], help='topic name') + parser.add_argument('--id', type=str, action='append', dest='topic_ids', default=[], help='topic UUID') parser.set_defaults(command=cls.command) @classmethod diff --git a/kafka/protocol/data_container.py b/kafka/protocol/data_container.py index 0c7a31e7e..5e0060d2b 100644 --- a/kafka/protocol/data_container.py +++ b/kafka/protocol/data_container.py @@ -142,12 +142,7 @@ def _to_dict_vals(self, meta=False, json=True): yield (field.name, [dict(val._to_dict_vals(meta=meta, json=json)) for val in getattr(self, field.name)]) else: val = getattr(self, field.name) - if json: - if isinstance(val, bytes): - val = val.decode() - elif isinstance(val, set): - val = list(val) - yield (field.name, val) + yield (field.name, field.to_json(val) if json else val) def to_dict(self, meta=False, json=True): """Use meta=True to include top-level version; meta='all' to include all internal versions diff --git a/kafka/protocol/schemas/fields/array.py b/kafka/protocol/schemas/fields/array.py index e63367eff..2b9cad9e1 100644 --- a/kafka/protocol/schemas/fields/array.py +++ b/kafka/protocol/schemas/fields/array.py @@ -118,5 +118,10 @@ def decode(self, data, version=None, compact=False, tagged=False): return [self.array_of.decode(data, version=version, compact=compact, tagged=tagged) for _ in range(size)] + def to_json(self, val): + if val is None: + return None + return [self.array_of.to_json(i) for i in val] + def __repr__(self): return 'ArrayField(%s)' % self._json diff --git a/kafka/protocol/schemas/fields/simple.py b/kafka/protocol/schemas/fields/simple.py index 595938dab..c8982af84 100644 --- a/kafka/protocol/schemas/fields/simple.py +++ b/kafka/protocol/schemas/fields/simple.py @@ -104,5 +104,21 @@ def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, t def decode(self, data, version=None, compact=False, tagged=False): return self._type.decode(data, compact=compact) + def to_json(self, val): + if val is None: + return None + if self._type is UUID: + return str(val) + elif self._type is Bytes: + if not isinstance(val, (bytes, bytearray, memoryview)): + val = val.encode() + if not isinstance(val, memoryview): + val = val.tobytes() + return val.decode(errors='backslashreplace') + elif self._type is BitField: + return list(val) + else: + return val + def __repr__(self): return 'SimpleField(%s)' % self._json