Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def run_cli(args=None):
if config.format == 'raw':
pprint(result)
elif config.format == 'json':
if hasattr(result, 'to_dict'):
result = result.to_dict()
print(json.dumps(result))
return 0
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions kafka/cli/admin/topics/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ class DeleteTopic:
@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('delete', help='Delete Kafka Topic')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='topic name')
parser.add_argument('--id', type=str, action='append', dest='topic_ids', help='topic UUID')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[], help='topic name')
parser.add_argument('--id', type=str, action='append', dest='topic_ids', default=[], help='topic UUID')
parser.set_defaults(command=cls.command)

@classmethod
Expand Down
7 changes: 1 addition & 6 deletions kafka/protocol/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,7 @@ def _to_dict_vals(self, meta=False, json=True):
yield (field.name, [dict(val._to_dict_vals(meta=meta, json=json)) for val in getattr(self, field.name)])
else:
val = getattr(self, field.name)
if json:
if isinstance(val, bytes):
val = val.decode()
elif isinstance(val, set):
val = list(val)
yield (field.name, val)
yield (field.name, field.to_json(val) if json else val)

def to_dict(self, meta=False, json=True):
"""Use meta=True to include top-level version; meta='all' to include all internal versions
Expand Down
5 changes: 5 additions & 0 deletions kafka/protocol/schemas/fields/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,10 @@ def decode(self, data, version=None, compact=False, tagged=False):
return [self.array_of.decode(data, version=version, compact=compact, tagged=tagged)
for _ in range(size)]

def to_json(self, val):
if val is None:
return None
return [self.array_of.to_json(i) for i in val]

def __repr__(self):
return 'ArrayField(%s)' % self._json
16 changes: 16 additions & 0 deletions kafka/protocol/schemas/fields/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,21 @@ def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, t
def decode(self, data, version=None, compact=False, tagged=False):
return self._type.decode(data, compact=compact)

def to_json(self, val):
if val is None:
return None
if self._type is UUID:
return str(val)
elif self._type is Bytes:
if not isinstance(val, (bytes, bytearray, memoryview)):
val = val.encode()
if not isinstance(val, memoryview):
val = val.tobytes()
return val.decode(errors='backslashreplace')
elif self._type is BitField:
return list(val)
else:
return val

def __repr__(self):
return 'SimpleField(%s)' % self._json
Loading