Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions src/datacustomcode/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

from loguru import logger
import pydantic
from pydantic import BaseModel
from pydantic import BaseModel, model_validator
import requests

from datacustomcode.cmd import cmd_output
Expand Down Expand Up @@ -376,13 +376,37 @@ class FunctionConfig(BaseConfig):
pass


class DloPermission(BaseModel):
dlo: list[str]


class DmoPermission(BaseModel):
dmo: list[str]


class Permissions(BaseModel):
read: Union[DloPermission]
write: Union[DloPermission]
read: Union[DloPermission, DmoPermission]
write: Union[DloPermission, DmoPermission]

@model_validator(mode="after")
def _no_mixed_layers(self) -> "Permissions":
read_is_dlo = isinstance(self.read, DloPermission)
write_is_dlo = isinstance(self.write, DloPermission)
if read_is_dlo != write_is_dlo:
raise ValueError(
"permissions.read and permissions.write must both reference "
"DLOs or both reference DMOs (got "
f"read={type(self.read).__name__}, "
f"write={type(self.write).__name__})"
)
return self


class DloPermission(BaseModel):
dlo: list[str]
def _permission_entries(perm: Union[DloPermission, DmoPermission]) -> list[str]:
"""Return the list of object names regardless of layer (DLO or DMO)."""
if isinstance(perm, DloPermission):
return perm.dlo
return perm.dmo


def get_config(directory: str) -> BaseConfig:
Expand All @@ -404,10 +428,17 @@ def get_config(directory: str) -> BaseConfig:
except json.JSONDecodeError as err:
raise ValueError(f"config.json at {config_path} is not valid JSON") from err
except pydantic.ValidationError as err:
missing_fields = [str(err["loc"][0]) for err in err.errors()]
errors = err.errors()
missing = [e for e in errors if e.get("type") == "missing"]
if missing and len(missing) == len(errors):
missing_fields = [str(e["loc"][0]) for e in missing]
raise ValueError(
f"config.json at {config_path} is missing required "
f"fields: {', '.join(missing_fields)}"
) from err
messages = [str(e.get("msg", "")) for e in errors]
raise ValueError(
f"config.json at {config_path} is missing required "
f"fields: {', '.join(missing_fields)}"
f"config.json at {config_path} is invalid: {'; '.join(messages)}"
) from err


Expand All @@ -421,17 +452,21 @@ def create_data_transform(
script_name = metadata.name
request_hydrated = DATA_TRANSFORM_REQUEST_TEMPLATE.copy()

# Add nodes for each write DLO
for i, dlo in enumerate(data_transform_config.permissions.write.dlo, 1):
# Add nodes for each write entry (DLO or DMO)
for i, name in enumerate(
_permission_entries(data_transform_config.permissions.write), 1
):
request_hydrated["nodes"][f"node{i}"] = {
"relation_name": dlo,
"relation_name": name,
"config": {"materialized": "table"},
"compiled_code": "",
}

# Add sources for each read DLO
for i, dlo in enumerate(data_transform_config.permissions.read.dlo, 1):
request_hydrated["sources"][f"source{i}"] = {"relation_name": dlo}
# Add sources for each read entry (DLO or DMO)
for i, name in enumerate(
_permission_entries(data_transform_config.permissions.read), 1
):
request_hydrated["sources"][f"source{i}"] = {"relation_name": name}

request_hydrated["macros"]["macro.byoc"]["arguments"][0]["name"] = script_name

Expand Down
159 changes: 150 additions & 9 deletions tests/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from datacustomcode.deploy import (
DloPermission,
DmoPermission,
Permissions,
get_config,
)
Expand Down Expand Up @@ -934,6 +935,56 @@ def test_verify_data_transform_config_missing_fields(self, mock_file, mock_exist
):
get_config("/test/dir/payload")

@patch(
"builtins.open",
new_callable=mock_open,
read_data=(
'{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", '
'"dataspace": "test_dataspace", '
'"permissions": {"read": {"dmo": ["input_dmo__dlm"]}, '
'"write": {"dmo": ["output_dmo__dlm"]}}}'
),
)
def test_get_config_dmo_permissions(self, mock_file):
"""DMO-only config.json parses into DmoPermission on both sides."""
result = get_config("/test/dir")
assert isinstance(result, DataTransformConfig)
assert isinstance(result.permissions.read, DmoPermission)
assert isinstance(result.permissions.write, DmoPermission)
assert result.permissions.read.dmo == ["input_dmo__dlm"]
assert result.permissions.write.dmo == ["output_dmo__dlm"]

@patch(
"builtins.open",
new_callable=mock_open,
read_data=(
'{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", '
'"dataspace": "test_dataspace", '
'"permissions": {"read": {"dlo": ["input_dlo"]}, '
'"write": {"dmo": ["output_dmo__dlm"]}}}'
),
)
def test_get_config_mixed_dlo_dmo_raises(self, mock_file):
"""A config that mixes DLO read with DMO write is rejected."""
with pytest.raises(ValueError) as excinfo:
get_config("/test/dir")
msg = str(excinfo.value)
assert "read" in msg and "write" in msg

@patch(
"builtins.open",
new_callable=mock_open,
read_data=(
'{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", '
'"dataspace": "test_dataspace", '
'"permissions": {"read": {}, "write": {"dlo": ["output_dlo"]}}}'
),
)
def test_get_config_empty_permission_raises(self, mock_file):
"""A permission block with neither dlo nor dmo is rejected."""
with pytest.raises(ValueError):
get_config("/test/dir")


class TestCreateDataTransform:
@patch("datacustomcode.deploy.get_config")
Expand Down Expand Up @@ -972,18 +1023,108 @@ def test_create_data_transform(self, mock_make_api_call, mock_get_config):
request_body = mock_make_api_call.call_args[1]["json"]
assert request_body["definition"]["type"] == "DCSQL"
assert request_body["dataSpaceName"] == "test_dataspace"
assert "nodes" in request_body["definition"]["manifest"]
assert "sources" in request_body["definition"]["manifest"]
assert "macros" in request_body["definition"]["manifest"]
assert (
request_body["definition"]["manifest"]["macros"]["macro.byoc"]["arguments"][
0
]["name"]
== "test_job"
)
manifest = request_body["definition"]["manifest"]
assert manifest["nodes"] == {
"node1": {
"relation_name": "output_dlo",
"config": {"materialized": "table"},
"compiled_code": "",
}
}
assert manifest["sources"] == {"source1": {"relation_name": "input_dlo"}}
assert manifest["macros"]["macro.byoc"]["arguments"][0]["name"] == "test_job"

assert result == {"id": "transform_id"}

@patch("datacustomcode.deploy.get_config")
@patch("datacustomcode.deploy._make_api_call")
def test_create_data_transform_dmo(self, mock_make_api_call, mock_get_config):
"""DMO permissions emit nodes/sources with DMO relation names."""
access_token = AccessTokenResponse(
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = CodeExtensionMetadata(
name="dmo_job",
version="1.0.0",
description="DMO job",
computeType="CPU_M",
codeType="script",
)

data_transform_config = DataTransformConfig(
sdkVersion="1.0.0",
entryPoint="entrypoint.py",
dataspace="test_dataspace",
permissions=Permissions(
read=DmoPermission(dmo=["input_dmo__dlm"]),
write=DmoPermission(dmo=["output_dmo__dlm"]),
),
)
mock_make_api_call.return_value = {"id": "transform_id"}

create_data_transform(
"/test/dir", access_token, metadata, data_transform_config
)

request_body = mock_make_api_call.call_args[1]["json"]
manifest = request_body["definition"]["manifest"]
assert manifest["nodes"] == {
"node1": {
"relation_name": "output_dmo__dlm",
"config": {"materialized": "table"},
"compiled_code": "",
}
}
assert manifest["sources"] == {"source1": {"relation_name": "input_dmo__dlm"}}
assert manifest["macros"]["macro.byoc"]["arguments"][0]["name"] == "dmo_job"
assert request_body["dataSpaceName"] == "test_dataspace"

@patch("datacustomcode.deploy.get_config")
@patch("datacustomcode.deploy._make_api_call")
def test_create_data_transform_multiple_dmos(
self, mock_make_api_call, mock_get_config
):
"""Multiple read DMOs become multiple sources; one write DMO is one node."""
access_token = AccessTokenResponse(
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = CodeExtensionMetadata(
name="dmo_multi",
version="1.0.0",
description="DMO multi",
computeType="CPU_M",
codeType="script",
)

data_transform_config = DataTransformConfig(
sdkVersion="1.0.0",
entryPoint="entrypoint.py",
dataspace="test_dataspace",
permissions=Permissions(
read=DmoPermission(dmo=["in1__dlm", "in2__dlm"]),
write=DmoPermission(dmo=["out__dlm"]),
),
)
mock_make_api_call.return_value = {"id": "transform_id"}

create_data_transform(
"/test/dir", access_token, metadata, data_transform_config
)

request_body = mock_make_api_call.call_args[1]["json"]
manifest = request_body["definition"]["manifest"]
assert manifest["sources"] == {
"source1": {"relation_name": "in1__dlm"},
"source2": {"relation_name": "in2__dlm"},
}
assert manifest["nodes"] == {
"node1": {
"relation_name": "out__dlm",
"config": {"materialized": "table"},
"compiled_code": "",
}
}


class TestDeployFull:
@patch("datacustomcode.deploy.get_config")
Expand Down
Loading