diff --git a/acceptance/bin/print_requests.py b/acceptance/bin/print_requests.py index bd4ab1eb381..9798027b05c 100755 --- a/acceptance/bin/print_requests.py +++ b/acceptance/bin/print_requests.py @@ -168,7 +168,7 @@ def main(): if not requests_file.exists(): sys.exit(f"File {requests_file.as_posix()} not found") - with open(requests_file) as fobj: + with open(requests_file, encoding="utf-8") as fobj: data = fobj.read() if not data: diff --git a/acceptance/bundle/dms/add-resources/databricks.yml b/acceptance/bundle/dms/add-resources/databricks.yml new file mode 100644 index 00000000000..6914816970d --- /dev/null +++ b/acceptance/bundle/dms/add-resources/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: add-resources-test + +resources: + jobs: + job_a: + name: job-a diff --git a/acceptance/bundle/dms/add-resources/out.test.toml b/acceptance/bundle/dms/add-resources/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/add-resources/output.txt b/acceptance/bundle/dms/add-resources/output.txt new file mode 100644 index 00000000000..eaa62eb6eb4 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/output.txt @@ -0,0 +1,149 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/add-resources-test/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/add-resources-test/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle plan +update jobs.job_a +update jobs.job_b + +Plan: 0 to add, 2 to change, 0 to delete, 0 unchanged + +>>> print_requests.py --get --sort //bundle ^//workspace-files ^//import-file +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources", + "q": { + "page_size": "1000" + }, + "body": null +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources", + "q": { + "page_size": "1000" + }, + "body": null +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "display_name": "add-resources-test", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "add-resources-test", + "target_name": "default", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "add-resources-test", + "target_name": "default", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.job_a" + }, + "body": { + "resource_key": "jobs.job_a", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "state": { + "deployment": { + "deployment_id": "[UUID]", + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/add-resources-test/default/state/metadata.json", + "version_id": "1" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-a", + "queue": { + "enabled": true + } + }, + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "name": "deployments/[UUID]/versions/2", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/operations", + "q": { + "resource_key": "jobs.job_b" + }, + "body": { + "resource_key": "jobs.job_b", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "state": { + "deployment": { + "deployment_id": "[UUID]", + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/add-resources-test/default/state/metadata.json", + "version_id": "2" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "job-b", + "queue": { + "enabled": true + } + }, + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} diff --git a/acceptance/bundle/dms/add-resources/script b/acceptance/bundle/dms/add-resources/script new file mode 100644 index 00000000000..8b1e75e1f3c --- /dev/null +++ b/acceptance/bundle/dms/add-resources/script @@ -0,0 +1,34 @@ +# Deploy with one job. +trace $CLI bundle deploy + +# Delete local cache to force reading state from DMS. +rm -rf .databricks + +# Add a second job and deploy again. +cat > databricks.yml << 'EOF' +bundle: + name: add-resources-test + +resources: + jobs: + job_a: + name: job-a + job_b: + name: job-b +EOF +trace $CLI bundle deploy + +# Delete local cache again and run plan — should show no changes. +rm -rf .databricks +trace $CLI bundle plan + +# Print metadata service requests. Should show: +# - Deploy 1: CREATE for job_a +# - Deploy 2: ListResources + CREATE for job_b (job_a is unchanged) +# - Plan: ListResources (no operations) +trace print_requests.py --get --sort //bundle ^//workspace-files ^//import-file + +# Clean up. +rm -rf .databricks +$CLI bundle destroy --auto-approve > /dev/null 2>&1 +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/add-resources/test.toml b/acceptance/bundle/dms/add-resources/test.toml new file mode 100644 index 00000000000..601384fdf96 --- /dev/null +++ b/acceptance/bundle/dms/add-resources/test.toml @@ -0,0 +1 @@ +Ignore = [".databricks"] diff --git a/acceptance/bundle/dms/deploy-error/databricks.yml b/acceptance/bundle/dms/deploy-error/databricks.yml new file mode 100644 index 00000000000..4786eeddf7e --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: metadata-service-error-test + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/deploy-error/out.test.toml b/acceptance/bundle/dms/deploy-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/deploy-error/output.txt b/acceptance/bundle/dms/deploy-error/output.txt new file mode 100644 index 00000000000..1706c0fac08 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/output.txt @@ -0,0 +1,74 @@ + +>>> musterr [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/metadata-service-error-test/default/files... +Deploying resources... +Error: cannot create resources.jobs.test_job: Invalid job configuration. (400 INVALID_PARAMETER_VALUE) + +Endpoint: POST [DATABRICKS_URL]/api/2.2/jobs/create +HTTP Status: 400 Bad Request +API error_code: INVALID_PARAMETER_VALUE +API message: Invalid job configuration. + + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "display_name": "metadata-service-error-test", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "metadata-service-error-test", + "target_name": "default", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "resource_key": "jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "state": { + "deployment": { + "deployment_id": "[UUID]", + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/metadata-service-error-test/default/state/metadata.json", + "version_id": "1" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "status": "OPERATION_STATUS_FAILED", + "error_message": "Invalid job configuration." + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_FAILURE" + } +} diff --git a/acceptance/bundle/dms/deploy-error/script b/acceptance/bundle/dms/deploy-error/script new file mode 100644 index 00000000000..28a0e4501f3 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/script @@ -0,0 +1,6 @@ +# Deploy with the metadata service enabled, expecting a resource creation failure. +trace musterr $CLI bundle deploy + +# Print the metadata service requests to verify the failed operation is reported. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/deploy-error/test.toml b/acceptance/bundle/dms/deploy-error/test.toml new file mode 100644 index 00000000000..70b16ea757c --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/test.toml @@ -0,0 +1,9 @@ +Ignore = [".databricks"] +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] +RecordRequests = true + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.StatusCode = 400 +Response.Body = '{"error_code": "INVALID_PARAMETER_VALUE", "message": "Invalid job configuration."}' diff --git a/acceptance/bundle/dms/plan-and-summary/databricks.yml b/acceptance/bundle/dms/plan-and-summary/databricks.yml new file mode 100644 index 00000000000..57120c0b94d --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: plan-summary-test + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/plan-and-summary/out.test.toml b/acceptance/bundle/dms/plan-and-summary/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/plan-and-summary/output.txt b/acceptance/bundle/dms/plan-and-summary/output.txt new file mode 100644 index 00000000000..cac857d2eea --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/output.txt @@ -0,0 +1,101 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/plan-summary-test/default/files... +Deploying resources... +Deployment complete! + +>>> [CLI] bundle plan +update jobs.test_job + +Plan: 0 to add, 1 to change, 0 to delete, 0 unchanged + +>>> [CLI] bundle summary +Name: plan-summary-test +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/plan-summary-test/default +Resources: + Jobs: + test_job: + Name: test-job + URL: [DATABRICKS_URL]/jobs/[NUMID]?w=[NUMID] + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "display_name": "plan-summary-test", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "plan-summary-test", + "target_name": "default", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "resource_key": "jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "state": { + "deployment": { + "deployment_id": "[UUID]", + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/plan-summary-test/default/state/metadata.json", + "version_id": "1" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources", + "q": { + "page_size": "1000" + }, + "body": null +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources", + "q": { + "page_size": "1000" + }, + "body": null +} diff --git a/acceptance/bundle/dms/plan-and-summary/script b/acceptance/bundle/dms/plan-and-summary/script new file mode 100644 index 00000000000..a662938cca9 --- /dev/null +++ b/acceptance/bundle/dms/plan-and-summary/script @@ -0,0 +1,16 @@ +# Deploy first to populate DMS state. +trace $CLI bundle deploy + +# Plan should read state from DMS via ListResources. +trace $CLI bundle plan + +# Summary should show the deployment ID and read state from DMS. +trace $CLI bundle summary + +# Print metadata service requests from plan and summary. +# Both should include ListResources calls. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file + +# Clean up. +$CLI bundle destroy --auto-approve > /dev/null 2>&1 +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/release-lock-error/databricks.yml b/acceptance/bundle/dms/release-lock-error/databricks.yml new file mode 100644 index 00000000000..94323b84d93 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/databricks.yml @@ -0,0 +1,11 @@ +bundle: + name: dms-release-lock-error + +targets: + fail-complete: + default: true + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/release-lock-error/out.test.toml b/acceptance/bundle/dms/release-lock-error/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt new file mode 100644 index 00000000000..935561f7bc1 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -0,0 +1,69 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/files... +Deploying resources... +Deployment complete! +Warn: Failed to release deployment lock: simulated complete version failure + +>>> print_requests.py --get //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "display_name": "dms-release-lock-error", + "target_name": "fail-complete" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "dms-release-lock-error", + "target_name": "fail-complete", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "resource_key": "jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "state": { + "deployment": { + "deployment_id": "[UUID]", + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/state/metadata.json", + "version_id": "1" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/release-lock-error/script b/acceptance/bundle/dms/release-lock-error/script new file mode 100644 index 00000000000..a9c0de93c97 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/script @@ -0,0 +1,9 @@ +# Deploy with the metadata service enabled. +# The target name "fail-complete" triggers a simulated error on the +# CompleteVersion endpoint (release lock), so deploy should warn about +# the failed lock release. +trace $CLI bundle deploy + +# Print the metadata service requests to verify the lock release was attempted. +trace print_requests.py --get //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/release-lock-error/test.toml b/acceptance/bundle/dms/release-lock-error/test.toml new file mode 100644 index 00000000000..a721baa4f63 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/test.toml @@ -0,0 +1,4 @@ +Ignore = [".databricks"] + +# Override target to "fail-complete" which makes the test server's +# CompleteVersion endpoint return an error, simulating a release failure. diff --git a/acceptance/bundle/dms/sequential-deploys/databricks.yml b/acceptance/bundle/dms/sequential-deploys/databricks.yml new file mode 100644 index 00000000000..0d7c1fb63b3 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: sequential-deploys-test + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/sequential-deploys/out.test.toml b/acceptance/bundle/dms/sequential-deploys/out.test.toml new file mode 100644 index 00000000000..9b50a81b196 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/sequential-deploys/output.txt b/acceptance/bundle/dms/sequential-deploys/output.txt new file mode 100644 index 00000000000..9982e9c4801 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/output.txt @@ -0,0 +1,191 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get --sort //bundle ^//workspace-files ^//import-file +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "display_name": "sequential-deploys-test", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "sequential-deploys-test", + "target_name": "default", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "resource_key": "jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "state": { + "deployment": { + "deployment_id": "[UUID]", + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/state/metadata.json", + "version_id": "1" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "test-job", + "queue": { + "enabled": true + } + }, + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get --sort //bundle ^//workspace-files ^//import-file +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources", + "q": { + "page_size": "1000" + }, + "body": null +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "sequential-deploys-test", + "target_name": "default", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "name": "deployments/[UUID]/versions/2", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/operations", + "q": { + "resource_key": "jobs.new_job" + }, + "body": { + "resource_key": "jobs.new_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "state": { + "deployment": { + "deployment_id": "[UUID]", + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/state/metadata.json", + "version_id": "2" + }, + "edit_mode": "UI_LOCKED", + "format": "MULTI_TASK", + "max_concurrent_runs": 1, + "name": "new-job", + "queue": { + "enabled": true + } + }, + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/files... +Deploying resources... +Deployment complete! + +>>> print_requests.py --get --sort //bundle ^//workspace-files ^//import-file +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]/resources", + "q": { + "page_size": "1000" + }, + "body": null +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "3" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "display_name": "sequential-deploys-test", + "target_name": "default", + "git_info": {} + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/complete", + "body": { + "name": "deployments/[UUID]/versions/3", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/operations", + "q": { + "resource_key": "jobs.test_job" + }, + "body": { + "resource_key": "jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_DELETE", + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} diff --git a/acceptance/bundle/dms/sequential-deploys/script b/acceptance/bundle/dms/sequential-deploys/script new file mode 100644 index 00000000000..7276760bf06 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/script @@ -0,0 +1,32 @@ +# Deploy with one job. +trace $CLI bundle deploy +trace print_requests.py --get --sort //bundle ^//workspace-files ^//import-file + +# Add a second job and redeploy. +cat > databricks.yml << 'EOF' +bundle: + name: sequential-deploys-test + +resources: + jobs: + test_job: + name: test-job + new_job: + name: new-job +EOF +trace $CLI bundle deploy +trace print_requests.py --get --sort //bundle ^//workspace-files ^//import-file + +# Remove the first job and redeploy (should delete test_job). +cat > databricks.yml << 'EOF' +bundle: + name: sequential-deploys-test + +resources: + jobs: + new_job: + name: new-job +EOF +trace $CLI bundle deploy +trace print_requests.py --get --sort //bundle ^//workspace-files ^//import-file +print_requests.py --get > /dev/null 2>&1 || true diff --git a/acceptance/bundle/dms/sequential-deploys/test.toml b/acceptance/bundle/dms/sequential-deploys/test.toml new file mode 100644 index 00000000000..601384fdf96 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/test.toml @@ -0,0 +1 @@ +Ignore = [".databricks"] diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..5d95b8d05dd --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,4 @@ +Badness = "Uses local test server; enable on cloud once the deployment metadata service is in production" +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] +RecordRequests = true diff --git a/bundle/bundle.go b/bundle/bundle.go index 868510b15e6..3f717c8652d 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -9,6 +9,7 @@ package bundle import ( "context" "fmt" + "net/http" "os" "path/filepath" "sync" @@ -141,6 +142,16 @@ type Bundle struct { // (direct only) deployment implementation and state DeploymentBundle direct.DeploymentBundle + // DeploymentID is the DMS deployment identifier read from workspace state. + // Populated during state pull when the deployment metadata service is enabled. + DeploymentID string + + // DeploymentVersionID is the DMS version created for the current deploy. + // Populated when the deployment lock is acquired (DMS enabled only) and + // stamped onto job/pipeline resources so each resource records the version + // that produced it. + DeploymentVersionID string + // if true, we skip approval checks for deploy, destroy resources and delete // files AutoApprove bool @@ -231,6 +242,20 @@ func (b *Bundle) initClientOnce(ctx context.Context) { if err != nil { return nil, fmt.Errorf("cannot resolve bundle auth configuration: %w", err) } + + // If DATABRICKS_LITESWAP_ID is set, wrap the transport to inject the + // x-databricks-traffic-id header for routing to the liteswap instance. + // This env var is only set during manual testing so os.Getenv is fine. + if liteswapID := os.Getenv("DATABRICKS_LITESWAP_ID"); liteswapID != "" { //nolint:forbidigo + inner := w.Config.HTTPTransport + if inner == nil { + inner = http.DefaultTransport + } + w.Config.HTTPTransport = &liteswapTransport{ + inner: inner, + trafficID: "testenv://liteswap/" + liteswapID, + } + } return w, nil }) } @@ -251,6 +276,19 @@ func (b *Bundle) WorkspaceClient(ctx context.Context) *databricks.WorkspaceClien return client } +// liteswapTransport injects the x-databricks-traffic-id header to route +// requests to a liteswap service instance. +type liteswapTransport struct { + inner http.RoundTripper + trafficID string +} + +func (t *liteswapTransport) RoundTrip(req *http.Request) (*http.Response, error) { + clone := req.Clone(req.Context()) + clone.Header.Set("x-databricks-traffic-id", t.trafficID) + return t.inner.RoundTrip(clone) +} + // SetWorkpaceClient sets the workspace client for this bundle. // This is used to inject a mock client for testing. func (b *Bundle) SetWorkpaceClient(w *databricks.WorkspaceClient) { diff --git a/bundle/deploy/lock/acquire.go b/bundle/deploy/lock/acquire.go deleted file mode 100644 index 6e4844ca5ff..00000000000 --- a/bundle/deploy/lock/acquire.go +++ /dev/null @@ -1,69 +0,0 @@ -package lock - -import ( - "context" - "errors" - "io/fs" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/permissions" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type acquire struct{} - -func Acquire() bundle.Mutator { - return &acquire{} -} - -func (m *acquire) Name() string { - return "lock:acquire" -} - -func (m *acquire) init(ctx context.Context, b *bundle.Bundle) error { - user := b.Config.Workspace.CurrentUser.UserName - dir := b.Config.Workspace.StatePath - l, err := locker.CreateLocker(user, dir, b.WorkspaceClient(ctx)) - if err != nil { - return err - } - - b.Locker = l - return nil -} - -func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - err := m.init(ctx, b) - if err != nil { - return diag.FromErr(err) - } - - force := b.Config.Bundle.Deployment.Lock.Force - log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) - err = b.Locker.Lock(ctx, force) - if err != nil { - log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) - - if errors.Is(err, fs.ErrPermission) { - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - if errors.Is(err, fs.ErrNotExist) { - // If we get a "doesn't exist" error from the API this indicates - // we either don't have permissions or the path is invalid. - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - return diag.FromErr(err) - } - - return nil -} diff --git a/bundle/deploy/lock/async_reporter.go b/bundle/deploy/lock/async_reporter.go new file mode 100644 index 00000000000..e9875062725 --- /dev/null +++ b/bundle/deploy/lock/async_reporter.go @@ -0,0 +1,85 @@ +package lock + +import ( + "context" + "encoding/json" + + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/libs/log" +) + +// Matches direct.defaultParallelism so a hard process crash drops at most +// ~10 unsent operation events. +const asyncReporterBufferSize = 10 + +type operationEvent struct { + resourceKey string + resourceID string + action deployplan.ActionType + operationErr error + state json.RawMessage +} + +// asyncReporter dispatches DMS operation reports from a single sender +// goroutine fed by a buffered channel. CRUD workers push and continue; +// when the buffer is full the send blocks, applying backpressure to the +// worker pool. Reporting is best-effort — DMS API errors are logged and +// the sender keeps draining. +type asyncReporter struct { + ch chan operationEvent + done chan struct{} + sendFn func(ctx context.Context, ev operationEvent) error + ctx context.Context +} + +// newAsyncReporter starts the sender goroutine. ctx is used for all DMS API +// calls and must outlive individual worker contexts. +func newAsyncReporter(ctx context.Context, sendFn func(context.Context, operationEvent) error) *asyncReporter { + r := &asyncReporter{ + ch: make(chan operationEvent, asyncReporterBufferSize), + done: make(chan struct{}), + sendFn: sendFn, + ctx: ctx, + } + go r.run() + return r +} + +func (r *asyncReporter) run() { + defer close(r.done) + for ev := range r.ch { + if err := r.sendFn(r.ctx, ev); err != nil { + log.Warnf(r.ctx, "Failed to report %s operation for %s to DMS: %v", ev.action, ev.resourceKey, err) + } + } +} + +func (r *asyncReporter) Reporter() direct.OperationReporter { + return func( + ctx context.Context, + resourceKey, resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, + ) error { + select { + case r.ch <- operationEvent{ + resourceKey: resourceKey, + resourceID: resourceID, + action: action, + operationErr: operationErr, + state: state, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Close signals end-of-input and waits for the sender to drain. +func (r *asyncReporter) Close() { + close(r.ch) + <-r.done +} diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 00000000000..5206a852eec --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,359 @@ +package lock + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "strconv" + "strings" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/statemgmt" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/tmpdms" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/google/uuid" +) + +const defaultHeartbeatInterval = 30 * time.Second + +type metadataServiceLock struct { + b *bundle.Bundle + versionType tmpdms.VersionType + + svc *tmpdms.DeploymentMetadataAPI + versionID string + + stopHeartbeat func() + reporter *asyncReporter +} + +func newMetadataServiceLock(b *bundle.Bundle, versionType tmpdms.VersionType) *metadataServiceLock { + return &metadataServiceLock{b: b, versionType: versionType} +} + +func (l *metadataServiceLock) Acquire(ctx context.Context) error { + if l.b.Config.Bundle.Deployment.Lock.Force { + return errors.New("force lock is not supported with the deployment metadata service") + } + + svc, err := tmpdms.NewDeploymentMetadataAPI(l.b.WorkspaceClient(ctx)) + if err != nil { + return fmt.Errorf("failed to create metadata service client: %w", err) + } + l.svc = svc + + deploymentID, versionID, err := acquireLock(ctx, l.b, svc, l.versionType) + if err != nil { + return err + } + + l.b.DeploymentID = deploymentID + l.b.DeploymentVersionID = versionID + l.versionID = versionID + l.stopHeartbeat = startHeartbeat(ctx, svc, deploymentID, versionID) + l.reporter = newAsyncReporter(ctx, makeSyncReporter(svc, deploymentID, versionID)) + l.b.DeploymentBundle.OperationReporter = l.reporter.Reporter() + return nil +} + +func (l *metadataServiceLock) Release(ctx context.Context, status DeploymentStatus) error { + if l.reporter != nil { + l.reporter.Close() + } + if l.stopHeartbeat != nil { + l.stopHeartbeat() + } + + reason := tmpdms.VersionCompleteSuccess + if status == DeploymentFailure { + reason = tmpdms.VersionCompleteFailure + } + + _, completeErr := l.svc.CompleteVersion(ctx, tmpdms.CompleteVersionRequest{ + DeploymentID: l.b.DeploymentID, + VersionID: l.versionID, + Name: fmt.Sprintf("deployments/%s/versions/%s", l.b.DeploymentID, l.versionID), + CompletionReason: reason, + }) + if completeErr != nil { + return completeErr + } + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%s", l.b.DeploymentID, l.versionID, reason) + + // For destroy operations, delete the deployment record after + // successfully releasing the lock. + if status == DeploymentSuccess && l.versionType == tmpdms.VersionTypeDestroy { + _, deleteErr := l.svc.DeleteDeployment(ctx, tmpdms.DeleteDeploymentRequest{ + DeploymentID: l.b.DeploymentID, + }) + if deleteErr != nil { + return fmt.Errorf("failed to delete deployment: %w", deleteErr) + } + } + + return nil +} + +// acquireLock implements the lock acquisition protocol using the deployment +// metadata service: resolve deployment ID, ensure deployment, create version. +func acquireLock(ctx context.Context, b *bundle.Bundle, svc *tmpdms.DeploymentMetadataAPI, versionType tmpdms.VersionType) (deploymentID, versionID string, err error) { + var isNew bool + deploymentID, isNew, err = resolveDeploymentID(ctx, b) + if err != nil { + return "", "", err + } + + if isNew { + // Fresh deployment: create the record and start at version 1. + _, createErr := svc.CreateDeployment(ctx, tmpdms.CreateDeploymentRequest{ + DeploymentID: deploymentID, + Deployment: &tmpdms.Deployment{ + DisplayName: b.Config.Bundle.Name, + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil { + return "", "", fmt.Errorf("failed to create deployment: %w", createErr) + } + // Write the deployment ID to workspace only after the server-side + // record is created. This avoids leaving a dangling ID if creation fails. + if err := writeDeploymentID(ctx, b, deploymentID); err != nil { + return "", "", err + } + versionID = "1" + } else { + // Existing deployment: get the last version ID to determine the next one. + dep, getErr := svc.GetDeployment(ctx, tmpdms.GetDeploymentRequest{ + DeploymentID: deploymentID, + }) + if getErr != nil { + return "", "", fmt.Errorf("failed to get deployment: %w", getErr) + } + lastVersion, parseErr := strconv.ParseInt(dep.LastVersionID, 10, 64) + if parseErr != nil { + return "", "", fmt.Errorf("failed to parse last_version_id %q: %w", dep.LastVersionID, parseErr) + } + versionID = strconv.FormatInt(lastVersion+1, 10) + } + + // Create a version to acquire the deployment lock. + version, versionErr := svc.CreateVersion(ctx, tmpdms.CreateVersionRequest{ + DeploymentID: deploymentID, + Parent: "deployments/" + deploymentID, + VersionID: versionID, + Version: &tmpdms.Version{ + DisplayName: b.Config.Bundle.Name, + DeploymentMode: deploymentMode(b.Config.Bundle.Mode), + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + // Same git provenance the CLI records in metadata.json. + GitInfo: &tmpdms.GitInfo{ + OriginURL: b.Config.Bundle.Git.OriginURL, + Branch: b.Config.Bundle.Git.Branch, + Commit: b.Config.Bundle.Git.Commit, + }, + }, + }) + if versionErr != nil { + return "", "", fmt.Errorf("failed to acquire deployment lock: %w", versionErr) + } + + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, version.VersionID) + return deploymentID, versionID, nil +} + +// resolveDeploymentID reads the deployment ID from managed_service.json in the +// workspace state directory. If the file doesn't exist or has no deployment ID, +// a new UUID is generated. The boolean return indicates whether this is a fresh +// deployment (true) or an existing one (false). For fresh deployments, the +// caller is responsible for writing the deployment ID to workspace after the +// server-side deployment record is created successfully. +func resolveDeploymentID(ctx context.Context, b *bundle.Bundle) (string, bool, error) { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return "", false, fmt.Errorf("failed to create state filer: %w", err) + } + + // Try reading existing deployment ID from managed_service.json. + reader, readErr := f.Read(ctx, statemgmt.ManagedServiceFileName) + if readErr == nil { + defer reader.Close() + data, err := io.ReadAll(reader) + if err != nil { + return "", false, fmt.Errorf("failed to read %s content: %w", statemgmt.ManagedServiceFileName, err) + } + var sj statemgmt.ManagedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + return "", false, fmt.Errorf("failed to parse %s: %w", statemgmt.ManagedServiceFileName, err) + } + if sj.DeploymentID != "" { + return sj.DeploymentID, false, nil + } + } else if !errors.Is(readErr, fs.ErrNotExist) { + return "", false, fmt.Errorf("failed to read %s: %w", statemgmt.ManagedServiceFileName, readErr) + } + + // Fresh deployment: generate a new ID but don't write yet. + return uuid.New().String(), true, nil +} + +// writeDeploymentID writes the deployment ID to managed_service.json in the +// workspace state directory. This should be called after the server-side +// deployment record is created successfully. +func writeDeploymentID(ctx context.Context, b *bundle.Bundle, deploymentID string) error { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + return fmt.Errorf("failed to create state filer: %w", err) + } + sj := statemgmt.ManagedServiceJSON{DeploymentID: deploymentID} + data, err := json.Marshal(sj) + if err != nil { + return fmt.Errorf("failed to marshal %s: %w", statemgmt.ManagedServiceFileName, err) + } + err = f.Write(ctx, statemgmt.ManagedServiceFileName, bytes.NewReader(data), filer.CreateParentDirectories, filer.OverwriteIfExists) + if err != nil { + return fmt.Errorf("failed to write %s: %w", statemgmt.ManagedServiceFileName, err) + } + return nil +} + +// makeSyncReporter returns the synchronous "send one event to DMS" function +// consumed by asyncReporter's sender goroutine. Skip-actions short-circuit to +// nil; mapping errors and API errors are returned to the caller (which logs +// and continues — see asyncReporter). +func makeSyncReporter(svc *tmpdms.DeploymentMetadataAPI, deploymentID, versionID string) func(context.Context, operationEvent) error { + return func(ctx context.Context, ev operationEvent) error { + // The internal state DB uses "resources.jobs.foo" keys but the API + // expects "jobs.foo" — strip the "resources." prefix. + apiKey := strings.TrimPrefix(ev.resourceKey, "resources.") + actionType, err := planActionToOperationAction(ev.action) + if err != nil { + return fmt.Errorf("mapping action for resource %s: %w", ev.resourceKey, err) + } + if actionType == "" { + return nil + } + + status := tmpdms.OperationStatusSucceeded + var errorMessage string + if ev.operationErr != nil { + status = tmpdms.OperationStatusFailed + errorMessage = ev.operationErr.Error() + } + + op := &tmpdms.Operation{ + ResourceKey: apiKey, + ResourceID: ev.resourceID, + Status: status, + ActionType: actionType, + ErrorMessage: errorMessage, + } + if len(ev.state) > 0 { + op.State = ev.state + } + + _, err = svc.CreateOperation(ctx, tmpdms.CreateOperationRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + ResourceKey: apiKey, + Operation: op, + }) + if err != nil { + return fmt.Errorf("reporting operation for resource %s: %w", ev.resourceKey, err) + } + return nil + } +} + +// planActionToOperationAction maps a deploy plan action to a metadata service +// operation action type. No-op actions like Skip return ("", nil) and should +// be ignored. +func planActionToOperationAction(action deployplan.ActionType) (tmpdms.OperationActionType, error) { + switch action { + case deployplan.Skip: + return "", nil + case deployplan.Create: + return tmpdms.OperationActionTypeCreate, nil + case deployplan.Update: + return tmpdms.OperationActionTypeUpdate, nil + case deployplan.UpdateWithID: + return tmpdms.OperationActionTypeUpdateWithID, nil + case deployplan.Delete: + return tmpdms.OperationActionTypeDelete, nil + case deployplan.Recreate: + return tmpdms.OperationActionTypeRecreate, nil + case deployplan.Resize: + return tmpdms.OperationActionTypeResize, nil + default: + return "", fmt.Errorf("unsupported operation action type: %s", action) + } +} + +// startHeartbeat starts a background goroutine that sends heartbeats to keep +// the deployment lock alive. Returns a cancel function to stop the heartbeat. +func startHeartbeat(ctx context.Context, svc *tmpdms.DeploymentMetadataAPI, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := svc.Heartbeat(ctx, tmpdms.HeartbeatRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + }) + if err != nil { + // A 409 ABORTED is expected if the version was completed + // between the ticker firing and the heartbeat request. + if isAborted(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + } else { + log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", deploymentID, versionID) + } + } + } + }() + + return cancel +} + +// isAborted checks if an error indicates the operation was aborted (HTTP 409 with ABORTED error code). +func isAborted(err error) bool { + apiErr, ok := errors.AsType[*apierr.APIError](err) + return ok && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" +} + +// deploymentMode maps a bundle target mode to the DMS deployment mode enum. +// Unset target modes produce an empty value, which is omitted from the request. +func deploymentMode(mode config.Mode) tmpdms.DeploymentMode { + switch mode { + case config.Development: + return tmpdms.DeploymentModeDevelopment + case config.Production: + return tmpdms.DeploymentModeProduction + default: + return "" + } +} diff --git a/bundle/deploy/lock/deployment_metadata_service_test.go b/bundle/deploy/lock/deployment_metadata_service_test.go new file mode 100644 index 00000000000..4217a47c69c --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service_test.go @@ -0,0 +1,73 @@ +package lock + +import ( + "testing" + + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/libs/tmpdms" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPlanActionToOperationAction(t *testing.T) { + tests := []struct { + action deployplan.ActionType + expected tmpdms.OperationActionType + }{ + {deployplan.Skip, ""}, + {deployplan.Create, tmpdms.OperationActionTypeCreate}, + {deployplan.Update, tmpdms.OperationActionTypeUpdate}, + {deployplan.UpdateWithID, tmpdms.OperationActionTypeUpdateWithID}, + {deployplan.Delete, tmpdms.OperationActionTypeDelete}, + {deployplan.Recreate, tmpdms.OperationActionTypeRecreate}, + {deployplan.Resize, tmpdms.OperationActionTypeResize}, + {"unknown_action", ""}, + } + + for _, tt := range tests { + t.Run(string(tt.action), func(t *testing.T) { + result, err := planActionToOperationAction(tt.action) + if tt.action == "unknown_action" { + assert.ErrorContains(t, err, "unsupported operation action type") + return + } + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGoalToVersionType(t *testing.T) { + vt, ok := goalToVersionType(GoalDeploy) + assert.True(t, ok) + assert.Equal(t, tmpdms.VersionTypeDeploy, vt) + + vt, ok = goalToVersionType(GoalDestroy) + assert.True(t, ok) + assert.Equal(t, tmpdms.VersionTypeDestroy, vt) + + _, ok = goalToVersionType(GoalBind) + assert.False(t, ok) + + _, ok = goalToVersionType(GoalUnbind) + assert.False(t, ok) +} + +func TestDeploymentMode(t *testing.T) { + tests := []struct { + mode config.Mode + expected tmpdms.DeploymentMode + }{ + {config.Development, tmpdms.DeploymentModeDevelopment}, + {config.Production, tmpdms.DeploymentModeProduction}, + {"", ""}, + {"unknown", ""}, + } + + for _, tt := range tests { + t.Run(string(tt.mode), func(t *testing.T) { + assert.Equal(t, tt.expected, deploymentMode(tt.mode)) + }) + } +} diff --git a/bundle/deploy/lock/lock.go b/bundle/deploy/lock/lock.go new file mode 100644 index 00000000000..2d08c52ba41 --- /dev/null +++ b/bundle/deploy/lock/lock.go @@ -0,0 +1,78 @@ +package lock + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/env" + "github.com/databricks/cli/libs/tmpdms" +) + +// Goal describes the purpose of a deployment operation. +type Goal string + +const ( + GoalBind = Goal("bind") + GoalUnbind = Goal("unbind") + GoalDeploy = Goal("deploy") + GoalDestroy = Goal("destroy") +) + +// DeploymentStatus indicates whether the deployment operation succeeded or failed. +type DeploymentStatus int + +const ( + DeploymentSuccess DeploymentStatus = iota + DeploymentFailure +) + +// DeploymentLock manages the deployment lock lifecycle. +type DeploymentLock interface { + // Acquire acquires the deployment lock. + Acquire(ctx context.Context) error + + // Release releases the deployment lock with the given deployment status. + Release(ctx context.Context, status DeploymentStatus) error +} + +// NewDeploymentLock returns a DeploymentLock implementation based on the +// current environment. If managed state is enabled and the goal maps to a +// supported version type, a metadata service lock is returned. Otherwise, +// a workspace filesystem lock is returned. +func NewDeploymentLock(ctx context.Context, b *bundle.Bundle, goal Goal) DeploymentLock { + useManagedState, _ := env.ManagedState(ctx) + if useManagedState == "true" { + versionType, ok := goalToVersionType(goal) + if ok { + return newMetadataServiceLock(b, versionType) + } + // Bind and unbind are not yet supported with the deployment metadata service. + return &unsupportedLock{goal: goal} + } + return newWorkspaceFilesystemLock(b, goal) +} + +// unsupportedLock is returned when a goal is not supported with DMS. +type unsupportedLock struct { + goal Goal +} + +func (l *unsupportedLock) Acquire(context.Context) error { + return fmt.Errorf("%s is not supported with the deployment metadata service", l.goal) +} + +func (l *unsupportedLock) Release(context.Context, DeploymentStatus) error { + return nil +} + +func goalToVersionType(goal Goal) (tmpdms.VersionType, bool) { + switch goal { + case GoalDeploy: + return tmpdms.VersionTypeDeploy, true + case GoalDestroy: + return tmpdms.VersionTypeDestroy, true + default: + return "", false + } +} diff --git a/bundle/deploy/lock/release.go b/bundle/deploy/lock/release.go deleted file mode 100644 index 26f95edfc95..00000000000 --- a/bundle/deploy/lock/release.go +++ /dev/null @@ -1,58 +0,0 @@ -package lock - -import ( - "context" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type Goal string - -const ( - GoalBind = Goal("bind") - GoalUnbind = Goal("unbind") - GoalDeploy = Goal("deploy") - GoalDestroy = Goal("destroy") -) - -type release struct { - goal Goal -} - -func Release(goal Goal) bundle.Mutator { - return &release{goal} -} - -func (m *release) Name() string { - return "lock:release" -} - -func (m *release) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - // Return early if the locker is not set. - // It is likely an error occurred prior to initialization of the locker instance. - if b.Locker == nil { - log.Warnf(ctx, "Unable to release lock if locker is not configured") - return nil - } - - log.Infof(ctx, "Releasing deployment lock") - switch m.goal { - case GoalDeploy: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalBind, GoalUnbind: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalDestroy: - return diag.FromErr(b.Locker.Unlock(ctx, locker.AllowLockFileNotExist)) - default: - return diag.Errorf("unknown goal for lock release: %s", m.goal) - } -} diff --git a/bundle/deploy/lock/workspace_filesystem.go b/bundle/deploy/lock/workspace_filesystem.go new file mode 100644 index 00000000000..de4ba792386 --- /dev/null +++ b/bundle/deploy/lock/workspace_filesystem.go @@ -0,0 +1,75 @@ +package lock + +import ( + "context" + "errors" + "io/fs" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/permissions" + "github.com/databricks/cli/libs/locker" + "github.com/databricks/cli/libs/log" +) + +type workspaceFilesystemLock struct { + b *bundle.Bundle + goal Goal +} + +func newWorkspaceFilesystemLock(b *bundle.Bundle, goal Goal) *workspaceFilesystemLock { + return &workspaceFilesystemLock{b: b, goal: goal} +} + +func (l *workspaceFilesystemLock) Acquire(ctx context.Context) error { + b := l.b + + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + user := b.Config.Workspace.CurrentUser.UserName + dir := b.Config.Workspace.StatePath + lk, err := locker.CreateLocker(user, dir, b.WorkspaceClient(ctx)) + if err != nil { + return err + } + + b.Locker = lk + + force := b.Config.Bundle.Deployment.Lock.Force + log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) + err = lk.Lock(ctx, force) + if err != nil { + log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) + + if errors.Is(err, fs.ErrPermission) || errors.Is(err, fs.ErrNotExist) { + diags := permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) + return diags.Error() + } + + return err + } + + return nil +} + +func (l *workspaceFilesystemLock) Release(ctx context.Context, _ DeploymentStatus) error { + b := l.b + + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + if b.Locker == nil { + log.Warnf(ctx, "Unable to release lock if locker is not configured") + return nil + } + + log.Infof(ctx, "Releasing deployment lock") + if l.goal == GoalDestroy { + return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist) + } + return b.Locker.Unlock(ctx) +} diff --git a/bundle/deploy/metadata/annotate_deployment_version.go b/bundle/deploy/metadata/annotate_deployment_version.go new file mode 100644 index 00000000000..5929ead1e89 --- /dev/null +++ b/bundle/deploy/metadata/annotate_deployment_version.go @@ -0,0 +1,42 @@ +package metadata + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" +) + +type annotateDeploymentVersion struct{} + +// AnnotateDeploymentVersion records the DMS deployment_id and version_id on the +// deployment block of every job and pipeline. Unlike AnnotateJobs/AnnotatePipelines +// (which run during Initialize), these IDs are only known after the deployment +// lock is acquired, so this mutator runs in the deploy phase. It is a no-op when +// the deployment metadata service is not enabled (DeploymentID is empty). +func AnnotateDeploymentVersion() bundle.Mutator { + return &annotateDeploymentVersion{} +} + +func (m *annotateDeploymentVersion) Name() string { + return "metadata.AnnotateDeploymentVersion" +} + +func (m *annotateDeploymentVersion) Apply(_ context.Context, b *bundle.Bundle) diag.Diagnostics { + if b.DeploymentID == "" { + return nil + } + + // AnnotateJobs and AnnotatePipelines run during Initialize and always set + // the Deployment block, so it is non-nil here. + for _, job := range b.Config.Resources.Jobs { + job.Deployment.DeploymentId = b.DeploymentID + job.Deployment.VersionId = b.DeploymentVersionID + } + for _, pipeline := range b.Config.Resources.Pipelines { + pipeline.Deployment.DeploymentId = b.DeploymentID + pipeline.Deployment.VersionId = b.DeploymentVersionID + } + + return nil +} diff --git a/bundle/deploy/metadata/annotate_deployment_version_test.go b/bundle/deploy/metadata/annotate_deployment_version_test.go new file mode 100644 index 00000000000..47cd2572c3f --- /dev/null +++ b/bundle/deploy/metadata/annotate_deployment_version_test.go @@ -0,0 +1,73 @@ +package metadata + +import ( + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func bundleWithDeploymentBlocks() *bundle.Bundle { + return &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "my-job": { + JobSettings: jobs.JobSettings{ + Name: "My Job", + Deployment: &jobs.JobDeployment{Kind: jobs.JobDeploymentKindBundle, MetadataFilePath: "/a/b/c/metadata.json"}, + }, + }, + }, + Pipelines: map[string]*resources.Pipeline{ + "my-pipeline": { + CreatePipeline: pipelines.CreatePipeline{ + Name: "My Pipeline", + Deployment: &pipelines.PipelineDeployment{Kind: pipelines.DeploymentKindBundle, MetadataFilePath: "/a/b/c/metadata.json"}, + }, + }, + }, + }, + }, + } +} + +func TestAnnotateDeploymentVersionStampsIDs(t *testing.T) { + b := bundleWithDeploymentBlocks() + b.DeploymentID = "dep-123" + b.DeploymentVersionID = "7" + + diags := AnnotateDeploymentVersion().Apply(t.Context(), b) + require.NoError(t, diags.Error()) + + job := b.Config.Resources.Jobs["my-job"].Deployment + assert.Equal(t, "dep-123", job.DeploymentId) + assert.Equal(t, "7", job.VersionId) + // Existing fields are preserved. + assert.Equal(t, jobs.JobDeploymentKindBundle, job.Kind) + assert.Equal(t, "/a/b/c/metadata.json", job.MetadataFilePath) + + pipeline := b.Config.Resources.Pipelines["my-pipeline"].Deployment + assert.Equal(t, "dep-123", pipeline.DeploymentId) + assert.Equal(t, "7", pipeline.VersionId) + assert.Equal(t, pipelines.DeploymentKindBundle, pipeline.Kind) + assert.Equal(t, "/a/b/c/metadata.json", pipeline.MetadataFilePath) +} + +func TestAnnotateDeploymentVersionNoopWithoutDMS(t *testing.T) { + b := bundleWithDeploymentBlocks() + // DeploymentID is empty: the deployment metadata service is not in use. + + diags := AnnotateDeploymentVersion().Apply(t.Context(), b) + require.NoError(t, diags.Error()) + + assert.Empty(t, b.Config.Resources.Jobs["my-job"].Deployment.DeploymentId) + assert.Empty(t, b.Config.Resources.Jobs["my-job"].Deployment.VersionId) + assert.Empty(t, b.Config.Resources.Pipelines["my-pipeline"].Deployment.DeploymentId) + assert.Empty(t, b.Config.Resources.Pipelines["my-pipeline"].Deployment.VersionId) +} diff --git a/bundle/deployplan/plan.go b/bundle/deployplan/plan.go index 2fb5d38c806..d3c2d849f8a 100644 --- a/bundle/deployplan/plan.go +++ b/bundle/deployplan/plan.go @@ -16,11 +16,13 @@ import ( const currentPlanVersion = 2 type Plan struct { - PlanVersion int `json:"plan_version,omitempty"` - CLIVersion string `json:"cli_version,omitempty"` - Lineage string `json:"lineage,omitempty"` - Serial int `json:"serial,omitempty"` - Plan map[string]*PlanEntry `json:"plan,omitzero"` + PlanVersion int `json:"plan_version,omitempty"` + CLIVersion string `json:"cli_version,omitempty"` + // Lineage and Serial are used in file-based direct deployments. For DMS + // deployments, they are replaced by deployment ID and version ID. + Lineage string `json:"lineage,omitempty"` + Serial int `json:"serial,omitempty"` + Plan map[string]*PlanEntry `json:"plan,omitzero"` mutex sync.Mutex `json:"-"` lockmap lockmap `json:"-"` diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index 16b145f7af8..d005b633e35 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -22,6 +22,11 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa panic("Planning is not done") } + if migrateMode && b.OperationReporter != nil { + logdiag.LogError(ctx, errors.New("migration is not supported with the deployment metadata service")) + return + } + if len(plan.Plan) == 0 { // Avoid creating state file if nothing to deploy return @@ -86,7 +91,23 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa logdiag.LogError(ctx, fmt.Errorf("%s: Unexpected delete action during migration", errorPrefix)) return false } + + // Capture the resource ID before deletion for operation reporting. + var deleteResourceID string + if b.OperationReporter != nil { + if dbentry, ok := b.StateDB.GetResourceEntry(resourceKey); ok { + deleteResourceID = dbentry.ID + } + } + err = d.Destroy(ctx, &b.StateDB) + if b.OperationReporter != nil { + reportErr := b.OperationReporter(ctx, resourceKey, deleteResourceID, action, err, nil) + if reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } + } if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false @@ -126,6 +147,23 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa err = d.Deploy(ctx, &b.StateDB, sv.Value, action, entry) } + // Report the operation inline to the metadata service. + if b.OperationReporter != nil { + // Data.State (via GetResourceEntry) is not updated until the WAL is + // merged, so during a deploy the ID comes from GetResourceID and the + // just-applied state from sv.Value — the same value SaveState persists. + resourceID := b.StateDB.GetResourceID(resourceKey) + resourceState, marshalErr := json.Marshal(sv.Value) + if marshalErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: serializing state for operation: %w", errorPrefix, marshalErr)) + return false + } + if reportErr := b.OperationReporter(ctx, resourceKey, resourceID, action, err, resourceState); reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } + } + if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index 5591626cd75..1e3e0607e12 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -40,6 +40,8 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error { // ValidatePlanAgainstState validates that a plan's lineage and serial match the given state. // If the plan has no lineage (first deployment), validation is skipped. +// Serialized plans are not supported with DMS today. When support is added, +// similar validation will be needed using the deployment ID and version ID. func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan) error { if plan.Lineage == "" { return nil diff --git a/bundle/direct/dresources/resources.yml b/bundle/direct/dresources/resources.yml index 8da2d5fee50..0829ae8eb77 100644 --- a/bundle/direct/dresources/resources.yml +++ b/bundle/direct/dresources/resources.yml @@ -16,6 +16,15 @@ resources: jobs: + ignore_local_changes: + # version_id is stamped with the current DMS deployment version on every + # deploy, so it changes constantly. Ignoring it as a local change keeps a + # new version from triggering an update on its own. When the job is updated + # for any other reason, DoUpdate sends the full config via Reset, so the + # current version_id is still recorded. + - field: deployment.version_id + reason: managed by the deployment metadata service + ignore_remote_changes: # Same as clusters.{aws,azure,gcp}_attributes — see clusters/resource_cluster.go#L361-L363 # s.SchemaPath("aws_attributes").SetSuppressDiff() @@ -131,6 +140,11 @@ resources: # "id" is output-only, providing it in config would be a mistake - field: id reason: "!drop" + # version_id is stamped with the current DMS deployment version on every + # deploy. Ignoring it as a local change keeps a new version from triggering + # an update on its own; a real update still sends it via the full config. + - field: deployment.version_id + reason: managed by the deployment metadata service backend_defaults: # https://github.com/databricks/terraform-provider-databricks/blob/4eba541abe1a9f50993ea7b9dd83874207e224a1/pipelines/resource_pipeline.go#L238 diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index 48a9c5a2ff7..a05df2f5110 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -2,6 +2,7 @@ package direct import ( "context" + "encoding/json" "fmt" "reflect" "sync" @@ -37,6 +38,19 @@ type DeploymentUnit struct { DependsOn []deployplan.DependsOnEntry } +// OperationReporter is called after each resource operation (success or failure) +// to report it to the deployment metadata service. The state parameter contains +// the resource's post-operation state (nil for deletes or failures). Returns an +// error if reporting fails; callers must treat this as a deployment failure. +type OperationReporter func( + ctx context.Context, + resourceKey string, + resourceID string, + action deployplan.ActionType, + operationErr error, + state json.RawMessage, +) error + // DeploymentBundle holds everything needed to deploy a bundle type DeploymentBundle struct { StateDB dstate.DeploymentState @@ -44,6 +58,10 @@ type DeploymentBundle struct { Plan *deployplan.Plan RemoteStateCache sync.Map StateCache structvar.Cache + + // OperationReporter, when set, is called inline after each successful + // resource Create/Update/Delete to report the operation to the metadata service. + OperationReporter OperationReporter } // SetRemoteState updates the remote state with type validation and marks as fresh. diff --git a/bundle/direct/version_id_ignore_test.go b/bundle/direct/version_id_ignore_test.go new file mode 100644 index 00000000000..196e46b548a --- /dev/null +++ b/bundle/direct/version_id_ignore_test.go @@ -0,0 +1,46 @@ +package direct + +import ( + "testing" + + "github.com/databricks/cli/bundle/direct/dresources" + "github.com/databricks/cli/libs/structs/structdiff" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// The deployment metadata service stamps a new version_id onto every job and +// pipeline on each deploy. These tests pin the invariant that a version_id-only +// change is classified as an ignored local change, so it never triggers an +// update on its own. They diff the real resource state structs so the field +// path the engine produces is verified against the rule, not just hard-coded. + +func TestDeploymentVersionIDIgnoredForJobs(t *testing.T) { + a := jobs.JobSettings{Deployment: &jobs.JobDeployment{Kind: jobs.JobDeploymentKindBundle, VersionId: "1"}} + b := jobs.JobSettings{Deployment: &jobs.JobDeployment{Kind: jobs.JobDeploymentKindBundle, VersionId: "2"}} + + changes, err := structdiff.GetStructDiff(a, b, nil) + require.NoError(t, err) + require.Len(t, changes, 1) + require.Equal(t, "deployment.version_id", changes[0].Path.String()) + + cfg := dresources.GetResourceConfig("jobs") + _, ok := findMatchingRule(changes[0].Path, cfg.IgnoreLocalChanges) + assert.True(t, ok, "deployment.version_id must match a jobs ignore_local_changes rule") +} + +func TestDeploymentVersionIDIgnoredForPipelines(t *testing.T) { + a := pipelines.CreatePipeline{Deployment: &pipelines.PipelineDeployment{Kind: pipelines.DeploymentKindBundle, VersionId: "1"}} + b := pipelines.CreatePipeline{Deployment: &pipelines.PipelineDeployment{Kind: pipelines.DeploymentKindBundle, VersionId: "2"}} + + changes, err := structdiff.GetStructDiff(a, b, nil) + require.NoError(t, err) + require.Len(t, changes, 1) + require.Equal(t, "deployment.version_id", changes[0].Path.String()) + + cfg := dresources.GetResourceConfig("pipelines") + _, ok := findMatchingRule(changes[0].Path, cfg.IgnoreLocalChanges) + assert.True(t, ok, "deployment.version_id must match a pipelines ignore_local_changes rule") +} diff --git a/bundle/env/deployment_metadata.go b/bundle/env/deployment_metadata.go new file mode 100644 index 00000000000..a4d08c7cd02 --- /dev/null +++ b/bundle/env/deployment_metadata.go @@ -0,0 +1,15 @@ +package env + +import "context" + +// managedStateVariable names the environment variable that controls whether +// server-managed state is used for locking and resource state management. +const managedStateVariable = "DATABRICKS_BUNDLE_MANAGED_STATE" + +// ManagedState returns the environment variable that controls whether +// server-managed state is used for locking and resource state management. +func ManagedState(ctx context.Context) (string, bool) { + return get(ctx, []string{ + managedStateVariable, + }) +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go index 48ba7755714..8bcba139445 100644 --- a/bundle/phases/bind.go +++ b/bundle/phases/bind.go @@ -23,13 +23,15 @@ import ( func Bind(ctx context.Context, b *bundle.Bundle, opts *terraform.BindOptions, engine engine.EngineType) { log.Info(ctx, "Phase: bind") - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(ctx, b, lock.GoalBind) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalBind)) + if err := dl.Release(ctx, lock.DeploymentSuccess); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { @@ -119,13 +121,15 @@ func jsonDump(ctx context.Context, v any, field string) string { func Unbind(ctx context.Context, b *bundle.Bundle, bundleType, tfResourceType, resourceKey string, engine engine.EngineType) { log.Info(ctx, "Phase: unbind") - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(ctx, b, lock.GoalUnbind) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalUnbind)) + if err := dl.Release(ctx, lock.DeploymentSuccess); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 3cac322f9e3..f108df0fd63 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -124,21 +124,24 @@ func uploadLibraries(ctx context.Context, b *bundle.Bundle, libs map[string][]li func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHandler, engine engine.EngineType, libs map[string][]libraries.LocationToUpdate, plan *deployplan.Plan) { log.Info(ctx, "Phase: deploy") - // Core mutators that CRUD resources and modify deployment state. These - // mutators need informed consent if they are potentially destructive. - bundle.ApplySeqContext(ctx, b, - scripts.Execute(config.ScriptPreDeploy), - lock.Acquire(), - ) - + bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPreDeploy)) if logdiag.HasError(ctx) { - // lock is not acquired here return } - // lock is acquired here + dl := lock.NewDeploymentLock(ctx, b, lock.GoalDeploy) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() uploadLibraries(ctx, b, libs) @@ -146,6 +149,13 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } + // Stamp the DMS deployment_id/version_id onto resources now that the lock is + // held and the version is known, so they are part of the config the plan diffs. + bundle.ApplyContext(ctx, b, metadata.AnnotateDeploymentVersion()) + if logdiag.HasError(ctx) { + return + } + bundle.ApplySeqContext(ctx, b, files.Upload(outputHandler), deploy.StateUpdate(), diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 95eec600dc2..00dca30fb42 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -120,13 +120,19 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(ctx, b, lock.GoalDestroy) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if !engine.IsDirect() { diff --git a/bundle/statemgmt/managed_service_json.go b/bundle/statemgmt/managed_service_json.go new file mode 100644 index 00000000000..7e688e5ebee --- /dev/null +++ b/bundle/statemgmt/managed_service_json.go @@ -0,0 +1,14 @@ +package statemgmt + +// ManagedServiceFileName is the filename for ManagedServiceJSON in the workspace +// state directory. +const ManagedServiceFileName = "managed_service.json" + +// ManagedServiceJSON holds DMS-specific bookkeeping (e.g. the deployment_id +// that ties this bundle to a server-side deployment record). It lives next to +// resources.json in the workspace state directory. resources.json is kept +// around and maintained alongside this file so users have a backward path if +// they hit issues with the DMS-backed deployment flow. +type ManagedServiceJSON struct { + DeploymentID string `json:"deployment_id"` +} diff --git a/bundle/statemgmt/state_dms.go b/bundle/statemgmt/state_dms.go new file mode 100644 index 00000000000..fd83735d660 --- /dev/null +++ b/bundle/statemgmt/state_dms.go @@ -0,0 +1,60 @@ +package statemgmt + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/direct/dstate" + "github.com/databricks/cli/libs/tmpdms" +) + +// LoadStateFromDMS loads resource state from the deployment metadata service +// into the state DB. It builds the in-memory database from the server's +// resource list and opens the state DB with it, which is the read-mode +// equivalent of opening a local state file when DMS is not in use. +func LoadStateFromDMS(ctx context.Context, b *bundle.Bundle) error { + if b.DeploymentID == "" { + return nil + } + + svc, err := tmpdms.NewDeploymentMetadataAPI(b.WorkspaceClient(ctx)) + if err != nil { + return fmt.Errorf("failed to create metadata service client: %w", err) + } + + resources, err := svc.ListResources(ctx, tmpdms.ListResourcesRequest{ + DeploymentID: b.DeploymentID, + }) + if err != nil { + return fmt.Errorf("failed to list resources from deployment metadata service: %w", err) + } + + data := dstate.NewDatabase("", 0) + for _, r := range resources { + // The DMS stores keys without the "resources." prefix (e.g., "jobs.foo"). + // The state DB expects the full key (e.g., "resources.jobs.foo"). + resourceKey := "resources." + r.ResourceKey + + var stateBytes json.RawMessage + if r.State != nil { + stateBytes, err = json.Marshal(r.State) + if err != nil { + return fmt.Errorf("marshaling state for %s: %w", resourceKey, err) + } + } + + data.State[resourceKey] = dstate.ResourceEntry{ + ID: r.ResourceID, + State: stateBytes, + } + } + + // OpenWithData populates the resource-key→ID index that GetResourceID relies + // on. Writing Data.State directly would leave that index empty, so deletes + // would fail with "missing in state". + _, localPath := b.StateFilenameDirect(ctx) + b.DeploymentBundle.StateDB.OpenWithData(localPath, data) + return nil +} diff --git a/bundle/statemgmt/state_pull.go b/bundle/statemgmt/state_pull.go index 7e62bb84967..59745b87a9c 100644 --- a/bundle/statemgmt/state_pull.go +++ b/bundle/statemgmt/state_pull.go @@ -16,6 +16,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -219,6 +220,18 @@ func readStates(ctx context.Context, b *bundle.Bundle, alwaysPull AlwaysPull) [] directLocalState := localRead(ctx, localPathDirect, engine.EngineDirect) terraformLocalState := localRead(ctx, localPathTerraform, engine.EngineTerraform) + // When DMS is enabled, read the deployment ID from workspace and return + // early. State is loaded from the server later via LoadStateFromDMS. + if useDMS, _ := env.ManagedState(ctx); useDMS == "true" { + f, err := deploy.StateFiler(ctx, b) + if err != nil { + logdiag.LogError(ctx, err) + return nil + } + b.DeploymentID = readDeploymentID(ctx, f) + return nil + } + if (directLocalState == nil && terraformLocalState == nil) || alwaysPull { f, err := deploy.StateFiler(ctx, b) if err != nil { @@ -305,3 +318,31 @@ func logStatesDiag(ctx context.Context, severity diag.Severity, msg string, stat Detail: "Available state files:\n- " + strings.Join(stateStrs, "\n- "), }) } + +// readDeploymentID reads the DMS deployment ID from the workspace +// managed_service.json. Returns "" if the file doesn't exist or doesn't +// contain a deployment_id. +func readDeploymentID(ctx context.Context, f filer.Filer) string { + reader, err := f.Read(ctx, ManagedServiceFileName) + if errors.Is(err, fs.ErrNotExist) { + return "" + } + if err != nil { + log.Debugf(ctx, "Failed to read %s for deployment ID: %v", ManagedServiceFileName, err) + return "" + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + log.Debugf(ctx, "Failed to read %s content: %v", ManagedServiceFileName, err) + return "" + } + + var sj ManagedServiceJSON + if err := json.Unmarshal(data, &sj); err != nil { + log.Debugf(ctx, "Failed to parse %s: %v", ManagedServiceFileName, err) + return "" + } + return sj.DeploymentID +} diff --git a/bundle/statemgmt/state_push.go b/bundle/statemgmt/state_push.go index f098e8a07cc..a75301ecb78 100644 --- a/bundle/statemgmt/state_push.go +++ b/bundle/statemgmt/state_push.go @@ -9,6 +9,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -16,7 +17,15 @@ import ( ) // PushResourcesState uploads the local state file to the remote location. +// When the deployment metadata service is enabled, state is managed by the +// server and no local push is needed. func PushResourcesState(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { + // When DMS is active, state is persisted per-operation to the server. + // No local state file to push. + if useDMS, _ := env.ManagedState(ctx); useDMS == "true" { + return + } + f, err := deploy.StateFiler(ctx, b) if err != nil { logdiag.LogError(ctx, err) diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index d61c4525530..bbb9ae84534 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -194,13 +194,21 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle return b, stateDesc, root.ErrAlreadyPrinted } - // Open direct engine state once for all subsequent operations (ExportState, CalculatePlan, Apply, etc.) + // Open direct engine state once for all subsequent operations (ExportState, CalculatePlan, Apply, etc.). + // When DMS is active, load state from the server instead of the local state file. needDirectState := stateDesc.Engine.IsDirect() && (opts.InitIDs || opts.ErrorOnEmptyState || opts.Deploy || opts.ReadPlanPath != "" || opts.PreDeployChecks || opts.PostStateFunc != nil) if needDirectState { - _, localPath := b.StateFilenameDirect(ctx) - if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { - logdiag.LogError(ctx, err) - return b, stateDesc, root.ErrAlreadyPrinted + if b.DeploymentID != "" { + if err := statemgmt.LoadStateFromDMS(ctx, b); err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } + } else { + _, localPath := b.StateFilenameDirect(ctx) + if err := b.DeploymentBundle.StateDB.Open(ctx, localPath, dstate.WithRecovery(true), dstate.WithWrite(false)); err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } } } @@ -242,6 +250,10 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle logdiag.LogError(ctx, errors.New("--plan is only supported with direct engine (set bundle.engine to \"direct\" or DATABRICKS_BUNDLE_ENGINE=direct)")) return b, stateDesc, root.ErrAlreadyPrinted } + if b.DeploymentID != "" { + logdiag.LogError(ctx, errors.New("--plan is not supported with the deployment metadata service")) + return b, stateDesc, root.ErrAlreadyPrinted + } opts.Build = false opts.PreDeployChecks = false @@ -256,12 +268,15 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle log.Warnf(ctx, "Plan was created with CLI version %s but current version is %s", plan.CLIVersion, currentVersion) } - // Validate that the plan's lineage and serial match the current state - // This must happen before any file operations - err = direct.ValidatePlanAgainstState(&b.DeploymentBundle.StateDB, plan) - if err != nil { - logdiag.LogError(ctx, err) - return b, stateDesc, root.ErrAlreadyPrinted + // Validate that the plan's lineage and serial match the current state. + // When DMS is active, the server validates version ordering during lock + // acquisition, so local state checks are unnecessary. + if b.DeploymentID == "" { + err = direct.ValidatePlanAgainstState(&b.DeploymentBundle.StateDB, plan) + if err != nil { + logdiag.LogError(ctx, err) + return b, stateDesc, root.ErrAlreadyPrinted + } } } else if opts.Deploy { opts.Build = true diff --git a/libs/testserver/deployment_metadata.go b/libs/testserver/deployment_metadata.go new file mode 100644 index 00000000000..a612aae817a --- /dev/null +++ b/libs/testserver/deployment_metadata.go @@ -0,0 +1,415 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/databricks/cli/libs/tmpdms" +) + +// deploymentMetadata holds in-memory state for the deployment metadata service. +// Stored per-workspace inside FakeWorkspace. +type deploymentMetadata struct { + // deployments keyed by deployment_id + deployments map[string]tmpdms.Deployment + + // versions keyed by "deploymentId/versionId" + versions map[string]tmpdms.Version + + // operations keyed by "deploymentId/versionId/resourceKey" + operations map[string]tmpdms.Operation + + // resources keyed by "deploymentId/resourceKey" + resources map[string]tmpdms.Resource + + // lock state per deployment: which version holds the lock and when it expires + lockHolder map[string]string // deploymentId -> "deployments/{id}/versions/{vid}" + lockExpiry map[string]time.Time // deploymentId -> expiry time +} + +func newDeploymentMetadata() *deploymentMetadata { + return &deploymentMetadata{ + deployments: map[string]tmpdms.Deployment{}, + versions: map[string]tmpdms.Version{}, + operations: map[string]tmpdms.Operation{}, + resources: map[string]tmpdms.Resource{}, + lockHolder: map[string]string{}, + lockExpiry: map[string]time.Time{}, + } +} + +const lockDuration = 2 * time.Minute + +func (s *FakeWorkspace) DeploymentMetadataCreateDeployment(req Request) Response { + defer s.LockUnlock()() + + // deployment_id is a query parameter, not in the body. + deploymentID := req.URL.Query().Get("deployment_id") + if deploymentID == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "deployment_id is required"}, + } + } + + // The body maps to the Deployment sub-message. + var bodyDeployment tmpdms.Deployment + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &bodyDeployment); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + } + + state := s.deploymentMetadata + if _, exists := state.deployments[deploymentID]; exists { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ALREADY_EXISTS", "message": fmt.Sprintf("deployment %s already exists", deploymentID)}, + } + } + + now := time.Now().UTC() + deployment := tmpdms.Deployment{ + Name: "deployments/" + deploymentID, + DisplayName: deploymentID, + TargetName: bodyDeployment.TargetName, + Status: tmpdms.DeploymentStatusActive, + CreatedBy: s.CurrentUser().UserName, + CreateTime: &now, + UpdateTime: &now, + } + + state.deployments[deploymentID] = deployment + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + + now := time.Now().UTC() + deployment.Status = tmpdms.DeploymentStatusDeleted + deployment.DestroyTime = &now + deployment.DestroyedBy = s.CurrentUser().UserName + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataCreateVersion(req Request, deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + + // version_id is a query parameter, not in the body. + versionID := req.URL.Query().Get("version_id") + if versionID == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "version_id is required"}, + } + } + + // The body maps to the Version sub-message. + var bodyVersion tmpdms.Version + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &bodyVersion); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + } + + // Validate version_id == last_version_id + 1 (matching server behavior). + var expectedVersionID string + if deployment.LastVersionID == "" { + expectedVersionID = "1" + } else { + lastVersion, err := strconv.ParseInt(deployment.LastVersionID, 10, 64) + if err != nil { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{"error_code": "INTERNAL_ERROR", "message": "stored last_version_id is not a valid number: " + deployment.LastVersionID}, + } + } + expectedVersionID = strconv.FormatInt(lastVersion+1, 10) + } + if versionID != expectedVersionID { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ABORTED", + "message": fmt.Sprintf("version_id must be %s (last_version_id + 1), got: %s", expectedVersionID, versionID), + }, + } + } + + // Check lock: if a lock is held and not expired, reject with 409. + now := time.Now().UTC() + if holder, hasLock := state.lockHolder[deploymentID]; hasLock { + if expiry, ok := state.lockExpiry[deploymentID]; ok && expiry.After(now) { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ABORTED", + "message": fmt.Sprintf("deployment is locked by %s until %s", holder, expiry.Format(time.RFC3339)), + }, + } + } + } + + versionKey := deploymentID + "/" + versionID + version := tmpdms.Version{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + VersionID: versionID, + CreatedBy: s.CurrentUser().UserName, + CreateTime: &now, + Status: tmpdms.VersionStatusInProgress, + } + version.CliVersion = bodyVersion.CliVersion + version.VersionType = bodyVersion.VersionType + version.TargetName = bodyVersion.TargetName + + state.versions[versionKey] = version + + // Acquire the lock. + lockExpiry := now.Add(lockDuration) + state.lockHolder[deploymentID] = version.Name + state.lockExpiry[deploymentID] = lockExpiry + + // Update the deployment's last_version_id and status. + deployment.LastVersionID = versionID + deployment.Status = tmpdms.DeploymentStatusInProgress + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataGetVersion(deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataHeartbeat(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + + if version.Status != tmpdms.VersionStatusInProgress { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "version is no longer in progress"}, + } + } + + // Verify this version holds the lock. + expectedHolder := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + if state.lockHolder[deploymentID] != expectedHolder { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "lock is not held by this version"}, + } + } + + // Renew the lock. + now := time.Now().UTC() + newExpiry := now.Add(lockDuration) + state.lockExpiry[deploymentID] = newExpiry + + return Response{Body: tmpdms.HeartbeatResponse{ExpireTime: &newExpiry}} +} + +func (s *FakeWorkspace) DeploymentMetadataCompleteVersion(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + // Allow tests to simulate a complete version failure. If the deployment's + // target_name is "fail-complete", return a 500 error. + if deployment, ok := state.deployments[deploymentID]; ok && deployment.TargetName == "fail-complete" { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{"error_code": "INTERNAL_ERROR", "message": "simulated complete version failure"}, + } + } + + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + + if version.Status != tmpdms.VersionStatusInProgress { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "version is already completed"}, + } + } + + var completeReq tmpdms.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &completeReq); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + + now := time.Now().UTC() + version.Status = tmpdms.VersionStatusCompleted + version.CompleteTime = &now + version.CompletionReason = completeReq.CompletionReason + version.CompletedBy = s.CurrentUser().UserName + state.versions[versionKey] = version + + // Release the lock. + delete(state.lockHolder, deploymentID) + delete(state.lockExpiry, deploymentID) + + // Update deployment status based on completion reason. + if deployment, ok := state.deployments[deploymentID]; ok { + switch completeReq.CompletionReason { + case tmpdms.VersionCompleteSuccess: + deployment.Status = tmpdms.DeploymentStatusActive + case tmpdms.VersionCompleteFailure, tmpdms.VersionCompleteForceAbort, tmpdms.VersionCompleteLeaseExpired: + deployment.Status = tmpdms.DeploymentStatusFailed + case tmpdms.VersionCompleteUnspecified: + // No status change for unspecified completion reason. + } + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + } + + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataCreateOperation(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + // resource_key is a query parameter, not in the body. + resourceKey := req.URL.Query().Get("resource_key") + if resourceKey == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "resource_key is required"}, + } + } + + // The body maps to the Operation sub-message. + var bodyOperation tmpdms.Operation + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &bodyOperation); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + } + + now := time.Now().UTC() + opKey := deploymentID + "/" + versionID + "/" + resourceKey + operation := tmpdms.Operation{ + Name: fmt.Sprintf("deployments/%s/versions/%s/operations/%s", deploymentID, versionID, resourceKey), + ResourceKey: resourceKey, + CreateTime: &now, + ActionType: bodyOperation.ActionType, + State: bodyOperation.State, + ResourceID: bodyOperation.ResourceID, + Status: bodyOperation.Status, + ErrorMessage: bodyOperation.ErrorMessage, + } + + state.operations[opKey] = operation + + // Upsert the deployment-level resource. + resKey := deploymentID + "/" + resourceKey + resource := tmpdms.Resource{ + Name: fmt.Sprintf("deployments/%s/resources/%s", deploymentID, resourceKey), + ResourceKey: resourceKey, + State: bodyOperation.State, + ResourceID: bodyOperation.ResourceID, + LastActionType: bodyOperation.ActionType, + LastVersionID: versionID, + } + state.resources[resKey] = resource + + return Response{Body: operation} +} + +func (s *FakeWorkspace) DeploymentMetadataListResources(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + prefix := deploymentID + "/" + var resources []tmpdms.Resource + for key, resource := range state.resources { + if strings.HasPrefix(key, prefix) { + resources = append(resources, resource) + } + } + if resources == nil { + resources = []tmpdms.Resource{} + } + return Response{Body: tmpdms.ListResourcesResponse{Resources: resources}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index ff70f6b0505..3dcdabdbf2d 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -186,6 +186,8 @@ type FakeWorkspace struct { // clusterVenvs caches Python venvs per existing cluster ID, // matching cloud behavior where libraries are cached on running clusters. clusterVenvs map[string]*clusterEnv + + deploymentMetadata *deploymentMetadata } func (s *FakeWorkspace) LockUnlock() func() { @@ -316,6 +318,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, + deploymentMetadata: newDeploymentMetadata(), Alerts: map[string]sql.AlertV2{}, Experiments: map[string]ml.GetExperimentResponse{}, ModelRegistryModels: map[string]ml.Model{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index b1ec9b2e3d8..6b87f18b17e 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -1034,4 +1034,42 @@ func AddDefaultHandlers(server *Server) { }, } }) + + // Deployment Metadata Service: + + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateDeployment(req) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetDeployment(req.Vars["deployment_id"]) + }) + + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataDeleteDeployment(req.Vars["deployment_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateVersion(req, req.Vars["deployment_id"]) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetVersion(req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.DeploymentMetadataHeartbeat(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.DeploymentMetadataCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateOperation(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.DeploymentMetadataListResources(req.Vars["deployment_id"]) + }) } diff --git a/libs/testserver/server.go b/libs/testserver/server.go index 9dbfe32c5fa..8377339f656 100644 --- a/libs/testserver/server.go +++ b/libs/testserver/server.go @@ -338,7 +338,7 @@ func (s *Server) serve(w http.ResponseWriter, r *http.Request, handler HandlerFu Body: []byte(rule.Body), Headers: getJsonHeaders(), } - } else if bytes.Contains(request.Body, []byte("INJECT_ERROR")) { + } else if bytes.Contains(request.Body, []byte("INJECT_ERROR")) || strings.Contains(r.URL.Path, "INJECT_ERROR") { resp = EncodedResponse{ StatusCode: 500, Body: []byte("INJECTED"), diff --git a/libs/tmpdms/api.go b/libs/tmpdms/api.go new file mode 100644 index 00000000000..a729553b030 --- /dev/null +++ b/libs/tmpdms/api.go @@ -0,0 +1,143 @@ +package tmpdms + +import ( + "context" + "fmt" + "net/http" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/client" +) + +const basePath = "/api/2.0/bundle" + +// DeploymentMetadataAPI is a client for the Deployment Metadata Service. +// +// This is a temporary implementation that will be replaced by the SDK-generated +// client once the proto definitions land in the Go SDK. The method signatures +// and types are designed to match what the SDK will generate, so migration +// should be a straightforward import path change. +type DeploymentMetadataAPI struct { + api *client.DatabricksClient +} + +func NewDeploymentMetadataAPI(w *databricks.WorkspaceClient) (*DeploymentMetadataAPI, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, fmt.Errorf("failed to create deployment metadata API client: %w", err) + } + return &DeploymentMetadataAPI{api: apiClient}, nil +} + +func (a *DeploymentMetadataAPI) CreateDeployment(ctx context.Context, request CreateDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := basePath + "/deployments" + query := map[string]any{"deployment_id": request.DeploymentID} + err := a.api.Do(ctx, http.MethodPost, path, nil, query, request.Deployment, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) GetDeployment(ctx context.Context, request GetDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := fmt.Sprintf("%s/deployments/%s", basePath, request.DeploymentID) + err := a.api.Do(ctx, http.MethodGet, path, nil, nil, nil, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) DeleteDeployment(ctx context.Context, request DeleteDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := fmt.Sprintf("%s/deployments/%s", basePath, request.DeploymentID) + err := a.api.Do(ctx, http.MethodDelete, path, nil, nil, nil, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CreateVersion(ctx context.Context, request CreateVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions", basePath, request.DeploymentID) + query := map[string]any{"version_id": request.VersionID} + err := a.api.Do(ctx, http.MethodPost, path, nil, query, request.Version, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) GetVersion(ctx context.Context, request GetVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions/%s", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodGet, path, nil, nil, nil, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) Heartbeat(ctx context.Context, request HeartbeatRequest) (*HeartbeatResponse, error) { + var resp HeartbeatResponse + path := fmt.Sprintf("%s/deployments/%s/versions/%s/heartbeat", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, struct{}{}, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CompleteVersion(ctx context.Context, request CompleteVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions/%s/complete", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, request, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CreateOperation(ctx context.Context, request CreateOperationRequest) (*Operation, error) { + var resp Operation + path := fmt.Sprintf("%s/deployments/%s/versions/%s/operations", basePath, request.DeploymentID, request.VersionID) + query := map[string]any{"resource_key": request.ResourceKey} + err := a.api.Do(ctx, http.MethodPost, path, nil, query, request.Operation, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) ListResources(ctx context.Context, request ListResourcesRequest) ([]Resource, error) { + var allResources []Resource + pageToken := "" + + for { + var resp ListResourcesResponse + path := fmt.Sprintf("%s/deployments/%s/resources", basePath, request.DeploymentID) + + q := map[string]any{ + "page_size": 1000, + } + if pageToken != "" { + q["page_token"] = pageToken + } + + err := a.api.Do(ctx, http.MethodGet, path, nil, q, nil, &resp) + if err != nil { + return nil, err + } + + allResources = append(allResources, resp.Resources...) + if resp.NextPageToken == "" { + break + } + pageToken = resp.NextPageToken + } + + return allResources, nil +} diff --git a/libs/tmpdms/types.go b/libs/tmpdms/types.go new file mode 100644 index 00000000000..9e4f900c6f8 --- /dev/null +++ b/libs/tmpdms/types.go @@ -0,0 +1,243 @@ +// Package tmpdms is a temporary client library for the Deployment Metadata Service. +// It mirrors the structure that the Databricks Go SDK will eventually generate from +// the service's proto definitions. When the protos land in the SDK, migration should +// be a straightforward import path change. +package tmpdms + +import "time" + +// Enum types matching the proto definitions. +// Values are the proto enum name strings, which is how proto-over-HTTP serializes enums. + +type ( + DeploymentStatus string + VersionStatus string + VersionComplete string + VersionType string + OperationStatus string + OperationActionType string + DeploymentResourceType string + DeploymentMode string +) + +const ( + DeploymentStatusUnspecified DeploymentStatus = "DEPLOYMENT_STATUS_UNSPECIFIED" + DeploymentStatusActive DeploymentStatus = "DEPLOYMENT_STATUS_ACTIVE" + DeploymentStatusFailed DeploymentStatus = "DEPLOYMENT_STATUS_FAILED" + DeploymentStatusInProgress DeploymentStatus = "DEPLOYMENT_STATUS_IN_PROGRESS" + DeploymentStatusDeleted DeploymentStatus = "DEPLOYMENT_STATUS_DELETED" +) + +const ( + VersionStatusUnspecified VersionStatus = "VERSION_STATUS_UNSPECIFIED" + VersionStatusInProgress VersionStatus = "VERSION_STATUS_IN_PROGRESS" + VersionStatusCompleted VersionStatus = "VERSION_STATUS_COMPLETED" +) + +const ( + VersionCompleteUnspecified VersionComplete = "VERSION_COMPLETE_UNSPECIFIED" + VersionCompleteSuccess VersionComplete = "VERSION_COMPLETE_SUCCESS" + VersionCompleteFailure VersionComplete = "VERSION_COMPLETE_FAILURE" + VersionCompleteForceAbort VersionComplete = "VERSION_COMPLETE_FORCE_ABORT" + VersionCompleteLeaseExpired VersionComplete = "VERSION_COMPLETE_LEASE_EXPIRED" +) + +const ( + VersionTypeUnspecified VersionType = "VERSION_TYPE_UNSPECIFIED" + VersionTypeDeploy VersionType = "VERSION_TYPE_DEPLOY" + VersionTypeDestroy VersionType = "VERSION_TYPE_DESTROY" +) + +const ( + DeploymentModeDevelopment DeploymentMode = "DEPLOYMENT_MODE_DEVELOPMENT" + DeploymentModeProduction DeploymentMode = "DEPLOYMENT_MODE_PRODUCTION" +) + +const ( + OperationStatusUnspecified OperationStatus = "OPERATION_STATUS_UNSPECIFIED" + OperationStatusSucceeded OperationStatus = "OPERATION_STATUS_SUCCEEDED" + OperationStatusFailed OperationStatus = "OPERATION_STATUS_FAILED" +) + +const ( + OperationActionTypeUnspecified OperationActionType = "OPERATION_ACTION_TYPE_UNSPECIFIED" + OperationActionTypeResize OperationActionType = "OPERATION_ACTION_TYPE_RESIZE" + OperationActionTypeUpdate OperationActionType = "OPERATION_ACTION_TYPE_UPDATE" + OperationActionTypeUpdateWithID OperationActionType = "OPERATION_ACTION_TYPE_UPDATE_WITH_ID" + OperationActionTypeCreate OperationActionType = "OPERATION_ACTION_TYPE_CREATE" + OperationActionTypeRecreate OperationActionType = "OPERATION_ACTION_TYPE_RECREATE" + OperationActionTypeDelete OperationActionType = "OPERATION_ACTION_TYPE_DELETE" + OperationActionTypeBind OperationActionType = "OPERATION_ACTION_TYPE_BIND" + OperationActionTypeBindAndUpdate OperationActionType = "OPERATION_ACTION_TYPE_BIND_AND_UPDATE" + OperationActionTypeInitRegister OperationActionType = "OPERATION_ACTION_TYPE_INITIAL_REGISTER" +) + +const ( + ResourceTypeUnspecified DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_UNSPECIFIED" + ResourceTypeJob DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_JOB" + ResourceTypePipeline DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_PIPELINE" + ResourceTypeModel DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_MODEL" + ResourceTypeRegisteredModel DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_REGISTERED_MODEL" + ResourceTypeExperiment DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_EXPERIMENT" + ResourceTypeServingEndpoint DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_MODEL_SERVING_ENDPOINT" + ResourceTypeQualityMonitor DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_QUALITY_MONITOR" + ResourceTypeSchema DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SCHEMA" + ResourceTypeVolume DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_VOLUME" + ResourceTypeCluster DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_CLUSTER" + ResourceTypeDashboard DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_DASHBOARD" + ResourceTypeApp DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_APP" + ResourceTypeCatalog DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_CATALOG" + ResourceTypeExternalLocation DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_EXTERNAL_LOCATION" + ResourceTypeSecretScope DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SECRET_SCOPE" + ResourceTypeAlert DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_ALERT" + ResourceTypeSQLWarehouse DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SQL_WAREHOUSE" + ResourceTypeDatabaseInstance DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_DATABASE_INSTANCE" + ResourceTypeDatabaseCatalog DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_DATABASE_CATALOG" + ResourceTypeSyncedDBTable DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SYNCED_DATABASE_TABLE" + ResourceTypePostgresProject DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_POSTGRES_PROJECT" + ResourceTypePostgresBranch DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_POSTGRES_BRANCH" + ResourceTypePostgresEndpoint DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_POSTGRES_ENDPOINT" +) + +// Resource types (proto message equivalents). + +type Deployment struct { + Name string `json:"name,omitempty"` + DisplayName string `json:"display_name,omitempty"` + TargetName string `json:"target_name,omitempty"` + Status DeploymentStatus `json:"status,omitempty"` + LastVersionID string `json:"last_version_id,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + UpdateTime *time.Time `json:"update_time,omitempty"` + DestroyTime *time.Time `json:"destroy_time,omitempty"` + DestroyedBy string `json:"destroyed_by,omitempty"` +} + +type Version struct { + Name string `json:"name,omitempty"` + VersionID string `json:"version_id,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + CompleteTime *time.Time `json:"complete_time,omitempty"` + CliVersion string `json:"cli_version,omitempty"` + Status VersionStatus `json:"status,omitempty"` + VersionType VersionType `json:"version_type,omitempty"` + CompletionReason VersionComplete `json:"completion_reason,omitempty"` + CompletedBy string `json:"completed_by,omitempty"` + DeploymentMode DeploymentMode `json:"deployment_mode,omitempty"` + DisplayName string `json:"display_name,omitempty"` + TargetName string `json:"target_name,omitempty"` + GitInfo *GitInfo `json:"git_info,omitempty"` +} + +// GitInfo is the git provenance snapshot recorded on a version. It mirrors the +// GitInfo proto message in the deployment metadata service and carries the same +// values the CLI writes to metadata.json. +type GitInfo struct { + OriginURL string `json:"origin_url,omitempty"` + Branch string `json:"branch,omitempty"` + Commit string `json:"commit,omitempty"` +} + +type Operation struct { + Name string `json:"name,omitempty"` + ResourceKey string `json:"resource_key,omitempty"` + ActionType OperationActionType `json:"action_type,omitempty"` + State any `json:"state,omitempty"` + ResourceID string `json:"resource_id,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + Status OperationStatus `json:"status,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type Resource struct { + Name string `json:"name,omitempty"` + ResourceKey string `json:"resource_key,omitempty"` + State any `json:"state,omitempty"` + ResourceID string `json:"resource_id,omitempty"` + LastActionType OperationActionType `json:"last_action_type,omitempty"` + LastVersionID string `json:"last_version_id,omitempty"` + ResourceType DeploymentResourceType `json:"resource_type,omitempty"` +} + +// Request types. + +type CreateDeploymentRequest struct { + DeploymentID string `json:"deployment_id"` + Deployment *Deployment `json:"deployment"` +} + +type GetDeploymentRequest struct { + DeploymentID string `json:"-"` +} + +type DeleteDeploymentRequest struct { + DeploymentID string `json:"-"` +} + +type CreateVersionRequest struct { + DeploymentID string `json:"-"` + Parent string `json:"parent"` + Version *Version `json:"version"` + VersionID string `json:"version_id"` +} + +type GetVersionRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` +} + +type HeartbeatRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` +} + +type CompleteVersionRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` + Name string `json:"name"` + CompletionReason VersionComplete `json:"completion_reason"` + Force bool `json:"force,omitempty"` +} + +type CreateOperationRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` + Parent string `json:"parent"` + ResourceKey string `json:"resource_key"` + Operation *Operation `json:"operation"` +} + +type ListResourcesRequest struct { + DeploymentID string `json:"-"` + Parent string `json:"parent"` + PageSize int `json:"page_size,omitempty"` + PageToken string `json:"page_token,omitempty"` +} + +// Response types. + +type HeartbeatResponse struct { + ExpireTime *time.Time `json:"expire_time,omitempty"` +} + +type ListDeploymentsResponse struct { + Deployments []Deployment `json:"deployments"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListVersionsResponse struct { + Versions []Version `json:"versions"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListOperationsResponse struct { + Operations []Operation `json:"operations"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListResourcesResponse struct { + Resources []Resource `json:"resources"` + NextPageToken string `json:"next_page_token,omitempty"` +}