Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def main():
dep_sdk = "tencentcloud-sdk-python >= %s" % __version__.rsplit(".", 1)[0]
setup(
name='tccli',
install_requires=[dep_sdk, "jmespath==0.10.0", "six==1.16.0"],
install_requires=[dep_sdk, "jmespath==0.10.0", "six==1.16.0", "cos-python-sdk-v5>=1.9.0"],
version=__version__,
packages=find_packages(),
include_package_data=True,
Expand Down
810 changes: 810 additions & 0 deletions tccli/plugins/cos/__init__.py

Large diffs are not rendered by default.

79 changes: 79 additions & 0 deletions tccli/plugins/cos/abort_multipart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""
abort 操作:清理未完成的分片上传
对齐 coscli abort 命令
"""
from qcloud_cos import CosServiceError
from .utils import init_cos_client


def abort_multipart(args, parsed_globals):
"""清理存储桶中未完成的分片上传"""
client, region = init_cos_client(parsed_globals)

bucket = args["bucket"]
prefix = args.get("prefix", "") or ""
upload_id = args.get("upload_id", "") or ""

try:
# 如果指定了 upload_id,则直接取消指定的分片上传
if upload_id:
cos_key = args.get("cos_key", "") or ""
if not cos_key:
print("Error: 指定 upload_id 时必须同时指定 cos_key")
return
client.abort_multipart_upload(
Bucket=bucket,
Key=cos_key,
UploadId=upload_id,
)
print("已取消分片上传: cos://%s/%s (UploadId: %s)" % (bucket, cos_key, upload_id))
return

# 否则列出并清理所有未完成的分片上传
key_marker = ""
upload_id_marker = ""
aborted = 0

while True:
response = client.list_multipart_uploads(
Bucket=bucket,
Prefix=prefix,
KeyMarker=key_marker,
UploadIdMarker=upload_id_marker,
MaxUploads=1000,
)

uploads = response.get("Upload", [])
if not isinstance(uploads, list):
uploads = [uploads]

for upload in uploads:
if not upload:
continue
key = upload.get("Key", "")
uid = upload.get("UploadId", "")
initiated = upload.get("Initiated", "")

client.abort_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=uid,
)
aborted += 1
print("已取消: cos://%s/%s (UploadId: %s, Initiated: %s)" % (bucket, key, uid, initiated))

if response.get("IsTruncated") == "true":
key_marker = response.get("NextKeyMarker", "")
upload_id_marker = response.get("NextUploadIdMarker", "")
else:
break

if aborted == 0:
print("没有未完成的分片上传")
else:
print("\n共清理 %d 个未完成的分片上传" % aborted)

except CosServiceError as e:
print("Error: %s (Code: %s, RequestId: %s)" % (
e.get_error_msg(), e.get_error_code(), e.get_request_id()))
140 changes: 140 additions & 0 deletions tccli/plugins/cos/acl_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
"""
ACL 操作:获取和设置存储桶/对象的访问控制
对齐 coscli 的 ACL 管理能力
"""
from qcloud_cos import CosServiceError
from .utils import init_cos_client


def get_bucket_acl(args, parsed_globals):
"""获取存储桶的访问控制列表"""
client, region = init_cos_client(parsed_globals)

bucket = args["bucket"]

try:
response = client.get_bucket_acl(Bucket=bucket)

owner = response.get("Owner", {})
print("存储桶 ACL: %s" % bucket)
print("-" * 60)
print("Owner ID: %s" % owner.get("ID", ""))
print("Owner DisplayName: %s" % owner.get("DisplayName", ""))

grants = response.get("AccessControlList", {}).get("Grant", [])
if not isinstance(grants, list):
grants = [grants]

print("\n授权列表:")
for grant in grants:
grantee = grant.get("Grantee", {})
permission = grant.get("Permission", "")
grantee_id = grantee.get("ID", "")
grantee_type = grantee.get("type", "")
grantee_uri = grantee.get("URI", "")
if grantee_uri:
print(" - URI: %-40s Permission: %s" % (grantee_uri, permission))
else:
print(" - ID: %-40s Type: %-15s Permission: %s" % (grantee_id, grantee_type, permission))

except CosServiceError as e:
print("Error: %s (Code: %s, RequestId: %s)" % (
e.get_error_msg(), e.get_error_code(), e.get_request_id()))


def put_bucket_acl(args, parsed_globals):
"""设置存储桶的访问控制"""
client, region = init_cos_client(parsed_globals)

bucket = args["bucket"]
acl = args.get("acl", "") or ""
grant_read = args.get("grant_read", "") or ""
grant_write = args.get("grant_write", "") or ""
grant_full_control = args.get("grant_full_control", "") or ""

try:
kwargs = {"Bucket": bucket}
if acl:
kwargs["ACL"] = acl
if grant_read:
kwargs["GrantRead"] = grant_read
if grant_write:
kwargs["GrantWrite"] = grant_write
if grant_full_control:
kwargs["GrantFullControl"] = grant_full_control

client.put_bucket_acl(**kwargs)
print("存储桶 ACL 设置成功: %s" % bucket)
if acl:
print("ACL: %s" % acl)

except CosServiceError as e:
print("Error: %s (Code: %s, RequestId: %s)" % (
e.get_error_msg(), e.get_error_code(), e.get_request_id()))


def get_object_acl(args, parsed_globals):
"""获取对象的访问控制列表"""
client, region = init_cos_client(parsed_globals)

bucket = args["bucket"]
cos_key = args["cos_key"]

try:
response = client.get_object_acl(Bucket=bucket, Key=cos_key)

owner = response.get("Owner", {})
print("对象 ACL: cos://%s/%s" % (bucket, cos_key))
print("-" * 60)
print("Owner ID: %s" % owner.get("ID", ""))
print("Owner DisplayName: %s" % owner.get("DisplayName", ""))

grants = response.get("AccessControlList", {}).get("Grant", [])
if not isinstance(grants, list):
grants = [grants]

print("\n授权列表:")
for grant in grants:
grantee = grant.get("Grantee", {})
permission = grant.get("Permission", "")
grantee_id = grantee.get("ID", "")
grantee_type = grantee.get("type", "")
grantee_uri = grantee.get("URI", "")
if grantee_uri:
print(" - URI: %-40s Permission: %s" % (grantee_uri, permission))
else:
print(" - ID: %-40s Type: %-15s Permission: %s" % (grantee_id, grantee_type, permission))

except CosServiceError as e:
print("Error: %s (Code: %s, RequestId: %s)" % (
e.get_error_msg(), e.get_error_code(), e.get_request_id()))


def put_object_acl(args, parsed_globals):
"""设置对象的访问控制"""
client, region = init_cos_client(parsed_globals)

bucket = args["bucket"]
cos_key = args["cos_key"]
acl = args.get("acl", "") or ""
grant_read = args.get("grant_read", "") or ""
grant_full_control = args.get("grant_full_control", "") or ""

try:
kwargs = {"Bucket": bucket, "Key": cos_key}
if acl:
kwargs["ACL"] = acl
if grant_read:
kwargs["GrantRead"] = grant_read
if grant_full_control:
kwargs["GrantFullControl"] = grant_full_control

client.put_object_acl(**kwargs)
print("对象 ACL 设置成功: cos://%s/%s" % (bucket, cos_key))
if acl:
print("ACL: %s" % acl)

except CosServiceError as e:
print("Error: %s (Code: %s, RequestId: %s)" % (
e.get_error_msg(), e.get_error_code(), e.get_request_id()))
62 changes: 62 additions & 0 deletions tccli/plugins/cos/cat_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
"""
cat 操作:查看 COS 对象内容
对齐 coscli cat 命令
"""
from qcloud_cos import CosServiceError
from .utils import init_cos_client, format_size


def cat_object(args, parsed_globals):
"""查看 COS 对象的内容"""
client, region = init_cos_client(parsed_globals)

bucket = args["bucket"]
cos_key = args["cos_key"]
byte_range = args.get("range", "") or ""
max_size = args.get("max_size", 10) or 10 # 默认最大 10MB

try:
# 先获取对象大小
head_response = client.head_object(
Bucket=bucket,
Key=cos_key,
)
content_length = int(head_response.get("Content-Length", 0))

# 检查文件大小限制
max_bytes = int(max_size) * 1024 * 1024
if content_length > max_bytes and not byte_range:
print("Warning: 文件大小 %s 超过限制 (%dMB),仅显示前 %dMB" % (
format_size(content_length), max_size, max_size))
byte_range = "bytes=0-%d" % (max_bytes - 1)

# 获取对象内容
kwargs = {
"Bucket": bucket,
"Key": cos_key,
}
if byte_range:
kwargs["Range"] = byte_range

response = client.get_object(**kwargs)
body = response["Body"]

# 读取并输出内容
content = body.get_raw_stream().read()
try:
text = content.decode("utf-8")
print(text)
except UnicodeDecodeError:
print("Warning: 文件内容不是 UTF-8 编码的文本,显示为十六进制")
hex_str = content[:1024].hex()
for i in range(0, len(hex_str), 64):
print(hex_str[i:i + 64])
if len(content) > 1024:
print("... (共 %d 字节)" % len(content))

except CosServiceError as e:
print("Error: %s (Code: %s, RequestId: %s)" % (
e.get_error_msg(), e.get_error_code(), e.get_request_id()))
except Exception as e:
print("Error: %s" % str(e))
Loading