Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Tests for convertToCapped command - collection type handling."""

import pytest

from documentdb_tests.compatibility.tests.core.collections.commands.utils.command_test_case import (
CommandContext,
CommandTestCase,
)
from documentdb_tests.framework.assertions import assertResult
from documentdb_tests.framework.error_codes import (
COMMAND_NOT_SUPPORTED_ON_VIEW_ERROR,
NAMESPACE_NOT_FOUND_ERROR,
)
from documentdb_tests.framework.executor import execute_command
from documentdb_tests.framework.parametrize import pytest_params
from documentdb_tests.framework.target_collection import (
CappedCollection,
ClusteredCollection,
TimeseriesCollection,
ViewCollection,
)

# Property [Collection Type Success]: convertToCapped returns ok:1.0 on success for
# various collection states including empty, populated, and clustered.
COLLECTION_TYPE_SUCCESS_TESTS: list[CommandTestCase] = [
CommandTestCase(
"empty_collection",
docs=[],
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 4096},
expected={"ok": 1.0},
msg="Empty collection should return ok:1.0",
),
CommandTestCase(
"populated_collection",
docs=[{"_id": 1}],
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 4096},
expected={"ok": 1.0},
msg="Populated collection should return ok:1.0",
),
CommandTestCase(
"clustered_collection",
target_collection=ClusteredCollection(),
docs=[{"_id": 1}],
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 4096},
expected={"ok": 1.0},
msg="Clustered collection should return ok:1.0",
),
]

# Property [Data Truncation]: when the cap size is smaller than the existing
# data, the command still succeeds.
DATA_TRUNCATION_TESTS: list[CommandTestCase] = [
CommandTestCase(
"cap_smaller_than_data",
docs=[{"_id": i, "x": "a" * 100} for i in range(10)],
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 256},
expected={"ok": 1.0},
msg="Cap smaller than existing data should succeed",
),
]

# Property [Already Capped]: converting an already-capped collection succeeds
# regardless of whether the new size is smaller, equal, or larger.
ALREADY_CAPPED_TESTS: list[CommandTestCase] = [
CommandTestCase(
"smaller_size",
target_collection=CappedCollection(size=4096),
docs=[{"_id": 1}],
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 2048},
expected={"ok": 1.0},
msg="Smaller size on already-capped collection should succeed",
),
CommandTestCase(
"same_size",
target_collection=CappedCollection(size=4096),
docs=[{"_id": 1}],
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 4096},
expected={"ok": 1.0},
msg="Same size on already-capped collection should succeed",
),
CommandTestCase(
"larger_size",
target_collection=CappedCollection(size=4096),
docs=[{"_id": 1}],
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 8192},
expected={"ok": 1.0},
msg="Larger size on already-capped collection should succeed",
),
]

# Property [Collection Existence Errors]: attempting to convert a
# non-existent collection produces a namespace-not-found error.
EXISTENCE_ERROR_TESTS: list[CommandTestCase] = [
CommandTestCase(
"nonexistent_collection",
docs=None,
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 4096},
error_code=NAMESPACE_NOT_FOUND_ERROR,
msg="Non-existent collection should produce a namespace-not-found error",
),
]

# Property [Collection Type Errors]: views and timeseries collections
# produce a command-not-supported-on-view error.
COLLECTION_TYPE_ERROR_TESTS: list[CommandTestCase] = [
CommandTestCase(
"view",
target_collection=ViewCollection(),
docs=None,
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 4096},
error_code=COMMAND_NOT_SUPPORTED_ON_VIEW_ERROR,
msg="View should produce a command-not-supported-on-view error",
),
CommandTestCase(
"timeseries",
target_collection=TimeseriesCollection(),
docs=None,
command=lambda ctx: {"convertToCapped": ctx.collection, "size": 4096},
error_code=COMMAND_NOT_SUPPORTED_ON_VIEW_ERROR,
msg="Timeseries collection should produce a command-not-supported-on-view error",
),
]

CONVERT_TO_CAPPED_COLLECTION_TYPE_TESTS: list[CommandTestCase] = (
COLLECTION_TYPE_SUCCESS_TESTS
+ DATA_TRUNCATION_TESTS
+ ALREADY_CAPPED_TESTS
+ EXISTENCE_ERROR_TESTS
+ COLLECTION_TYPE_ERROR_TESTS
)


@pytest.mark.collection_mgmt
@pytest.mark.parametrize("test", pytest_params(CONVERT_TO_CAPPED_COLLECTION_TYPE_TESTS))
def test_convert_to_capped_collection_types(database_client, collection, test):
"""Test convertToCapped command behavior for various collection types."""
collection = test.prepare(database_client, collection)
ctx = CommandContext.from_collection(collection)
result = execute_command(collection, test.build_command(ctx))
assertResult(
result,
expected=test.build_expected(ctx),
error_code=test.error_code,
msg=test.msg,
raw_res=True,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Tests for convertToCapped command - ancillary command field handling."""

import datetime

import pytest
from bson import (
Binary,
Code,
DBRef,
Decimal128,
Int64,
MaxKey,
MinKey,
ObjectId,
Regex,
Timestamp,
)

from documentdb_tests.compatibility.tests.core.collections.commands.utils.command_test_case import (
CommandContext,
CommandTestCase,
)
from documentdb_tests.framework.assertions import assertResult
from documentdb_tests.framework.executor import execute_command
from documentdb_tests.framework.parametrize import pytest_params
from documentdb_tests.framework.test_constants import DECIMAL128_ONE_AND_HALF

# Property [Comment Type Acceptance]: the comment field accepts any BSON
# type without affecting command success.
COMMENT_TYPE_ACCEPTANCE_TESTS: list[CommandTestCase] = [
CommandTestCase(
f"comment_{id}",
docs=[{"_id": 1}],
command=lambda ctx, v=val: {
"convertToCapped": ctx.collection,
"size": 4096,
"comment": v,
},
expected={"ok": 1.0},
msg=f"comment={id} should be accepted",
)
for id, val in [
("string", "hello"),
("int32", 42),
("int64", Int64(99)),
("double", 3.14),
("decimal128", DECIMAL128_ONE_AND_HALF),
("bool", True),
("null", None),
("array", [1, 2, 3]),
("object", {"key": "value"}),
("objectid", ObjectId("507f1f77bcf86cd799439011")),
("binary", Binary(b"hello")),
("regex", Regex("pattern", "i")),
("code", Code("function() {}")),
("code_with_scope", Code("function() {}", {"x": 1})),
("minkey", MinKey()),
("maxkey", MaxKey()),
("timestamp", Timestamp(1, 1)),
("datetime", datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)),
("dbref", DBRef("coll", "id")),
]
]

# Property [Unknown Command Fields]: unrecognized top-level command fields
# are silently accepted without error.
UNKNOWN_FIELD_TESTS: list[CommandTestCase] = [
CommandTestCase(
"single_unknown_field",
docs=[{"_id": 1}],
command=lambda ctx: {
"convertToCapped": ctx.collection,
"size": 4096,
"unknownField": "hello",
},
expected={"ok": 1.0},
msg="Single unknown field should be silently accepted",
),
CommandTestCase(
"multiple_unknown_fields",
docs=[{"_id": 1}],
command=lambda ctx: {
"convertToCapped": ctx.collection,
"size": 4096,
"foo": 42,
"bar": [1, 2, 3],
"baz": {"nested": True},
},
expected={"ok": 1.0},
msg="Multiple unknown fields should be silently accepted",
),
]

# Property [bypassDocumentValidation Type Acceptance]: bypassDocumentValidation
# is accepted as a recognized command field with any BSON type without error.
BYPASS_DOC_VALIDATION_TESTS: list[CommandTestCase] = [
CommandTestCase(
f"bypass_{id}",
docs=[{"_id": 1}],
command=lambda ctx, v=val: {
"convertToCapped": ctx.collection,
"size": 4096,
"bypassDocumentValidation": v,
},
expected={"ok": 1.0},
msg=f"bypassDocumentValidation={id} should be accepted",
)
for id, val in [
("string", "hello"),
("int32", 42),
("int64", Int64(42)),
("double", 3.14),
("decimal128", Decimal128("3.14")),
("bool_true", True),
("bool_false", False),
("null", None),
("array", [1, 2, 3]),
("object", {"key": "value"}),
("objectid", ObjectId("507f1f77bcf86cd799439011")),
("datetime", datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)),
("timestamp", Timestamp(1, 1)),
("binary", Binary(b"hello")),
("regex", Regex("pattern", "i")),
("code", Code("function() {}")),
("code_with_scope", Code("function() {}", {"x": 1})),
("minkey", MinKey()),
("maxkey", MaxKey()),
]
]

CONVERT_TO_CAPPED_COMMAND_FIELD_TESTS: list[CommandTestCase] = (
COMMENT_TYPE_ACCEPTANCE_TESTS + UNKNOWN_FIELD_TESTS + BYPASS_DOC_VALIDATION_TESTS
)


@pytest.mark.collection_mgmt
@pytest.mark.parametrize("test", pytest_params(CONVERT_TO_CAPPED_COMMAND_FIELD_TESTS))
def test_convert_to_capped_command_fields(database_client, collection, test):
"""Test convertToCapped command ancillary field handling."""
collection = test.prepare(database_client, collection)
ctx = CommandContext.from_collection(collection)
result = execute_command(collection, test.build_command(ctx))
assertResult(
result,
expected=test.build_expected(ctx),
error_code=test.error_code,
msg=test.msg,
raw_res=True,
)
Loading
Loading