From 607bdd0aabb62d63b9e474a086efce8bf7542b08 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 1 Jun 2026 17:47:03 +0200 Subject: [PATCH] bundle/deploy/lock: record deployment history in DMS alongside the file lock Records each deploy/destroy as a version with the Deployment Metadata Service (DMS), gated by experimental.record_deployment_history. No lock abstraction: the existing workspace-filesystem lock mutators (lock.Acquire / lock.Release) are used as-is, and DMS version recording runs alongside them in the deploy and destroy phases. - deployment_metadata_service.go: DeploymentVersionRecorder with CreateVersion / CompleteVersion (no-ops unless the flag is set). The deployment ID is the state lineage read from resources.json, so a bundle deployment maps one-to-one to a DMS deployment record. GetDeployment first, CreateDeployment only when missing, then create the next version (server validates version_id == last_version_id + 1). A heartbeat goroutine keeps the version's lease alive; CompleteVersion records success/failure and, on destroy + success, deletes the deployment record. Uses w.Bundle from the SDK. - direct/dstate: GetOrInitLineage returns the state lineage, generating and storing one on the first deploy so DMS and the deployment engine use the same lineage value as the deployment ID (DMS records the version at lock time, before the engine assigns the lineage at plan-apply time). - phases/deploy.go, phases/destroy.go: after acquiring the file lock, create a version; complete it in the deferred release before unlocking. - libs/testserver: in-memory DMS handlers (/api/2.0/bundle/...) for acceptance. - acceptance/bundle/dms/record: asserts that a deploy creates the DMS deployment + version and applies the workspace-filesystem lock (direct only). Co-authored-by: Shreyas Goenka --- .../dms/enable-after-deploy/databricks.yml | 7 + .../dms/enable-after-deploy/out.test.toml | 3 + .../bundle/dms/enable-after-deploy/output.txt | 88 +++++++ .../bundle/dms/enable-after-deploy/script | 25 ++ acceptance/bundle/dms/record/databricks.yml | 10 + acceptance/bundle/dms/record/out.test.toml | 3 + acceptance/bundle/dms/record/output.txt | 148 +++++++++++ acceptance/bundle/dms/record/script | 23 ++ acceptance/bundle/dms/test.toml | 5 + .../lock/deployment_metadata_service.go | 234 ++++++++++++++++++ bundle/direct/dstate/state.go | 20 ++ bundle/phases/deploy.go | 15 +- bundle/phases/destroy.go | 14 ++ libs/testserver/bundle.go | 106 ++++++++ libs/testserver/fake_workspace.go | 5 + libs/testserver/handlers.go | 20 ++ 16 files changed, 725 insertions(+), 1 deletion(-) create mode 100644 acceptance/bundle/dms/enable-after-deploy/databricks.yml create mode 100644 acceptance/bundle/dms/enable-after-deploy/out.test.toml create mode 100644 acceptance/bundle/dms/enable-after-deploy/output.txt create mode 100644 acceptance/bundle/dms/enable-after-deploy/script create mode 100644 acceptance/bundle/dms/record/databricks.yml create mode 100644 acceptance/bundle/dms/record/out.test.toml create mode 100644 acceptance/bundle/dms/record/output.txt create mode 100644 acceptance/bundle/dms/record/script create mode 100644 acceptance/bundle/dms/test.toml create mode 100644 bundle/deploy/lock/deployment_metadata_service.go create mode 100644 libs/testserver/bundle.go diff --git a/acceptance/bundle/dms/enable-after-deploy/databricks.yml b/acceptance/bundle/dms/enable-after-deploy/databricks.yml new file mode 100644 index 00000000000..cd8ff90b266 --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-enable-after-deploy + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/enable-after-deploy/out.test.toml b/acceptance/bundle/dms/enable-after-deploy/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/enable-after-deploy/output.txt b/acceptance/bundle/dms/enable-after-deploy/output.txt new file mode 100644 index 00000000000..0d9addaa6ee --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/output.txt @@ -0,0 +1,88 @@ + +=== Deploy without the feature: no DMS calls are made +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Enable experimental.record_deployment_history +=== Delete local cache (.databricks) so the next deploy cannot rely on it +=== Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default + +Deleting files... +Destroy complete! diff --git a/acceptance/bundle/dms/enable-after-deploy/script b/acceptance/bundle/dms/enable-after-deploy/script new file mode 100644 index 00000000000..c17b3b859dc --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/script @@ -0,0 +1,25 @@ +cleanup() { + trace $CLI bundle destroy --auto-approve + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy without the feature: no DMS calls are made" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock + +title "Enable experimental.record_deployment_history" +cat >> databricks.yml <<'YAML' + +experimental: + record_deployment_history: true +YAML + +title "Delete local cache (.databricks) so the next deploy cannot rely on it" +rm -rf .databricks + +title "Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache)" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/record/databricks.yml b/acceptance/bundle/dms/record/databricks.yml new file mode 100644 index 00000000000..b20e6274310 --- /dev/null +++ b/acceptance/bundle/dms/record/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: dms-record + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/record/out.test.toml b/acceptance/bundle/dms/record/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/record/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/record/output.txt b/acceptance/bundle/dms/record/output.txt new file mode 100644 index 00000000000..ea51648031d --- /dev/null +++ b/acceptance/bundle/dms/record/output.txt @@ -0,0 +1,148 @@ + +=== Deploy: a DMS deployment + version are recorded while the file lock is held +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== DMS API calls made during deploy (create deployment, create + complete version) +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +=== The workspace-filesystem lock is applied alongside DMS (deploy.lock written) +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Destroy: a DMS version is recorded while the file lock is held +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-record/default + +Deleting files... +Destroy complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "3" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DESTROY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "DELETE", + "path": "/api/2.0/bundle/deployments/[UUID]" +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} diff --git a/acceptance/bundle/dms/record/script b/acceptance/bundle/dms/record/script new file mode 100644 index 00000000000..afb9b05bdc3 --- /dev/null +++ b/acceptance/bundle/dms/record/script @@ -0,0 +1,23 @@ +cleanup() { + title "Destroy: a DMS version is recorded while the file lock is held" + trace $CLI bundle destroy --auto-approve + trace print_requests.py //api/2.0/bundle --keep + trace print_requests.py //deploy.lock + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy: a DMS deployment + version are recorded while the file lock is held" +trace $CLI bundle deploy + +title "DMS API calls made during deploy (create deployment, create + complete version)" +trace print_requests.py //api/2.0/bundle --keep + +title "The workspace-filesystem lock is applied alongside DMS (deploy.lock written)" +trace print_requests.py //deploy.lock + +title "Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment)" +rm -rf .databricks +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..84b939a7da2 --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,5 @@ +# Deployment Metadata Service (DMS) recording is only meaningful in the direct +# engine, where the deployment ID is derived from the state lineage. +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] + +RecordRequests = true diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 00000000000..1c5159da5a2 --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,234 @@ +package lock + +import ( + "context" + "errors" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// The server expires a version's lease if it does not receive a heartbeat +// within a 2-minute TTL; we heartbeat well inside that window. +const defaultHeartbeatInterval = 30 * time.Second + +// DeploymentStatus indicates whether the deployment operation succeeded or failed. +type DeploymentStatus int + +const ( + DeploymentSuccess DeploymentStatus = iota + DeploymentFailure +) + +// DeploymentVersionRecorder records each deploy/destroy as a version with the +// Deployment Metadata Service (DMS). It runs alongside the workspace-filesystem +// lock (lock.Acquire / lock.Release): a version is created after the lock is +// acquired and completed before it is released. +// +// Recording is gated by experimental.record_deployment_history; when disabled, +// CreateVersion and CompleteVersion are no-ops. The deployment ID is the state +// lineage (resources.json), so a bundle deployment maps one-to-one to a DMS +// deployment record. +type DeploymentVersionRecorder struct { + b *bundle.Bundle + goal Goal + enabled bool + + // populated by CreateVersion + svc sdkbundle.BundleInterface + deploymentID string + versionNum int64 + stopHeartbeat context.CancelFunc +} + +// NewDeploymentVersionRecorder returns a recorder for the given goal. The +// returned recorder is a no-op unless experimental.record_deployment_history +// is set. +func NewDeploymentVersionRecorder(b *bundle.Bundle, goal Goal) *DeploymentVersionRecorder { + enabled := b.Config.Experimental != nil && b.Config.Experimental.RecordDeploymentHistory + return &DeploymentVersionRecorder{b: b, goal: goal, enabled: enabled} +} + +// CreateVersion registers a new deployment version with DMS, claiming it for the +// duration of the deployment. No-op when recording is disabled. +func (r *DeploymentVersionRecorder) CreateVersion(ctx context.Context) error { + if !r.enabled { + return nil + } + + versionType, ok := goalToVersionType(r.goal) + if !ok { + return fmt.Errorf("%s is not supported with the deployment metadata service", r.goal) + } + + r.svc = r.b.WorkspaceClient(ctx).Bundle + + // The deployment ID is the state lineage. GetOrInitLineage generates one on + // the first deploy and stores it so the deploy persists the same value. + r.deploymentID = r.b.DeploymentBundle.StateDB.GetOrInitLineage() + + versionID, err := createDeploymentVersion(ctx, r.b, r.svc, r.deploymentID, versionType) + if err != nil { + return err + } + + versionNum, err := strconv.ParseInt(versionID, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse version ID %q: %w", versionID, err) + } + r.versionNum = versionNum + r.stopHeartbeat = startHeartbeat(ctx, r.svc, r.deploymentID, versionID) + return nil +} + +// CompleteVersion finalizes the version created by CreateVersion. No-op when +// recording is disabled or no version was created. +func (r *DeploymentVersionRecorder) CompleteVersion(ctx context.Context, status DeploymentStatus) error { + if !r.enabled || r.svc == nil { + return nil + } + + if r.stopHeartbeat != nil { + r.stopHeartbeat() + } + + versionIDStr := strconv.FormatInt(r.versionNum, 10) + versionName := fmt.Sprintf("deployments/%s/versions/%s", r.deploymentID, versionIDStr) + + reason := sdkbundle.VersionCompleteVersionCompleteSuccess + if status == DeploymentFailure { + reason = sdkbundle.VersionCompleteVersionCompleteFailure + } + + _, err := r.svc.CompleteVersion(ctx, sdkbundle.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }) + if err != nil { + return err + } + log.Infof(ctx, "Completed deployment version: deployment=%s version=%s reason=%s", r.deploymentID, versionIDStr, reason) + + // For destroy operations, delete the deployment record after the version + // completes successfully. + if status == DeploymentSuccess && r.goal == GoalDestroy { + err = r.svc.DeleteDeployment(ctx, sdkbundle.DeleteDeploymentRequest{ + Name: "deployments/" + r.deploymentID, + }) + if err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + + return nil +} + +// createDeploymentVersion ensures the deployment record exists, then creates a +// new version. The deployment ID is the state lineage: we GetDeployment first +// and only CreateDeployment when it does not exist yet. +func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, deploymentID string, versionType sdkbundle.VersionType) (versionID string, err error) { + dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ + Name: "deployments/" + deploymentID, + }) + switch { + case errors.Is(getErr, apierr.ErrNotFound): + // Fresh deployment: create the record and start at version 1. + _, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{ + DeploymentId: deploymentID, + Deployment: sdkbundle.Deployment{ + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil { + return "", fmt.Errorf("failed to create deployment: %w", createErr) + } + versionID = "1" + case getErr != nil: + return "", fmt.Errorf("failed to get deployment: %w", getErr) + default: + // Existing deployment: increment the last version to get the next one. + lastVersion, parseErr := strconv.ParseInt(dep.LastVersionId, 10, 64) + if parseErr != nil { + return "", fmt.Errorf("failed to parse last_version_id %q: %w", dep.LastVersionId, parseErr) + } + versionID = strconv.FormatInt(lastVersion+1, 10) + } + + // The server validates that versionID equals last_version_id + 1 and returns + // ABORTED otherwise (e.g. a concurrent deploy already created this version). + version, versionErr := svc.CreateVersion(ctx, sdkbundle.CreateVersionRequest{ + Parent: "deployments/" + deploymentID, + VersionId: versionID, + Version: sdkbundle.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, + }) + if versionErr != nil { + return "", fmt.Errorf("failed to create deployment version: %w", versionErr) + } + + log.Infof(ctx, "Created deployment version: deployment=%s version=%s", deploymentID, version.VersionId) + return versionID, nil +} + +// startHeartbeat starts a background goroutine that sends heartbeats to keep +// the deployment version's lease alive. Returns a cancel function to stop it. +func startHeartbeat(ctx context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + versionName := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := svc.Heartbeat(ctx, sdkbundle.HeartbeatRequest{Name: versionName}) + if err != nil { + // A 409 ABORTED is expected if the version was completed + // between the ticker firing and the heartbeat. + if isAbortedErr(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + } else { + log.Debugf(ctx, "Deployment heartbeat sent: deployment=%s version=%s", deploymentID, versionID) + } + } + } + }() + + return cancel +} + +// isAbortedErr reports whether err is an HTTP 409 ABORTED from the DMS API. +func isAbortedErr(err error) bool { + apiErr, ok := errors.AsType[*apierr.APIError](err) + return ok && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" +} + +// goalToVersionType maps a deployment goal to a DMS VersionType. +// Returns false for goals not supported by the metadata service (bind/unbind). +func goalToVersionType(goal Goal) (sdkbundle.VersionType, bool) { + switch goal { + case GoalDeploy: + return sdkbundle.VersionTypeVersionTypeDeploy, true + case GoalDestroy: + return sdkbundle.VersionTypeVersionTypeDestroy, true + default: + return "", false + } +} diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index 5b2a70adbb3..b87166df60b 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -147,6 +147,26 @@ func (db *DeploymentState) GetResourceID(key string) string { return db.stateIDs[key] } +// GetOrInitLineage returns the state lineage, generating and storing a new one +// in memory if the state does not have a lineage yet (fresh deployment). Storing +// it in Data.Lineage means a subsequent UpgradeToWrite persists the same value, +// so the lineage observed here matches the one the deployment engine later writes +// to the state file. +// +// This lets DMS use the same lineage value as the deployment engine for the +// deployment ID. It is necessary because DMS creates the version/deployment at +// lock time, which is before the engine assigns the lineage at plan-apply time; +// without seeding it here the two would diverge on the first deploy. +func (db *DeploymentState) GetOrInitLineage() string { + db.mu.Lock() + defer db.mu.Unlock() + + if db.Data.Lineage == "" { + db.Data.Lineage = uuid.New().String() + } + return db.Data.Lineage +} + type ( // If true, then Open reads the WAL and merges it in the state. If false, and WAL is present, Open returns an error. WithRecovery bool diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 15546880b9a..b3c6f3c682d 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -136,10 +136,23 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - // lock is acquired here + // lock is acquired here. Record a DMS deployment version while the lock is + // held (no-op unless experimental.record_deployment_history is set). + recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDeploy) defer func() { + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := recorder.CompleteVersion(ctx, status); err != nil { + logdiag.LogError(ctx, err) + } bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) }() + if err := recorder.CreateVersion(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } uploadLibraries(ctx, b, libs) if logdiag.HasError(ctx) { diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 95eec600dc2..7a078f6ed6a 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -125,9 +125,23 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } + // Record a DMS deployment version while the lock is held (no-op unless + // experimental.record_deployment_history is set). + recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDestroy) defer func() { + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := recorder.CompleteVersion(ctx, status); err != nil { + logdiag.LogError(ctx, err) + } bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) }() + if err := recorder.CreateVersion(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } if !engine.IsDirect() { bundle.ApplySeqContext(ctx, b, diff --git a/libs/testserver/bundle.go b/libs/testserver/bundle.go new file mode 100644 index 00000000000..a59da9cce13 --- /dev/null +++ b/libs/testserver/bundle.go @@ -0,0 +1,106 @@ +package testserver + +import ( + "encoding/json" + "strconv" + + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// Handlers for the Deployment Metadata Service (DMS) API under /api/2.0/bundle. +// State is kept in FakeWorkspace.Deployments, keyed by deployment ID. + +func (s *FakeWorkspace) BundleCreateDeployment(req Request) Response { + deploymentID := req.URL.Query().Get("deployment_id") + + var dep sdkbundle.Deployment + if err := json.Unmarshal(req.Body, &dep); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep.Name = "deployments/" + deploymentID + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusActive + s.Deployments[deploymentID] = &dep + return Response{Body: dep} +} + +func (s *FakeWorkspace) BundleGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{ + "error_code": "RESOURCE_DOES_NOT_EXIST", + "message": "deployment " + deploymentID + " does not exist", + }, + } + } + return Response{Body: *dep} +} + +func (s *FakeWorkspace) BundleDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + delete(s.Deployments, deploymentID) + return Response{Body: map[string]any{}} +} + +func (s *FakeWorkspace) BundleCreateVersion(req Request, deploymentID string) Response { + versionID := req.URL.Query().Get("version_id") + + var version sdkbundle.Version + if err := json.Unmarshal(req.Body, &version); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{"error_code": "RESOURCE_DOES_NOT_EXIST", "message": "deployment does not exist"}, + } + } + + // Mirror the server-side optimistic concurrency check: the new version must + // be exactly last_version_id + 1. + last, _ := strconv.ParseInt(dep.LastVersionId, 10, 64) + want := strconv.FormatInt(last+1, 10) + if dep.LastVersionId == "" { + want = "1" + } + if versionID != want { + return Response{ + StatusCode: 409, + Headers: map[string][]string{"Content-Type": {"application/json"}}, + Body: map[string]string{"error_code": "ABORTED", "message": "expected version " + want + ", got " + versionID}, + } + } + + dep.LastVersionId = versionID + version.Name = "deployments/" + deploymentID + "/versions/" + versionID + version.VersionId = versionID + version.Status = sdkbundle.VersionStatusVersionStatusInProgress + return Response{Body: version} +} + +func (s *FakeWorkspace) BundleCompleteVersion(req Request, deploymentID, versionID string) Response { + var completeReq sdkbundle.CompleteVersionRequest + _ = json.Unmarshal(req.Body, &completeReq) + + return Response{Body: sdkbundle.Version{ + Name: "deployments/" + deploymentID + "/versions/" + versionID, + VersionId: versionID, + Status: sdkbundle.VersionStatusVersionStatusCompleted, + CompletionReason: completeReq.CompletionReason, + }} +} + +func (s *FakeWorkspace) BundleHeartbeat() Response { + return Response{Body: sdkbundle.HeartbeatResponse{}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index ff70f6b0505..3f3b3fa5bfe 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/databricks/databricks-sdk-go/service/apps" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -176,6 +177,9 @@ type FakeWorkspace struct { PostgresSyncedTables map[string]postgres.SyncedTable PostgresOperations map[string]postgres.Operation + // Deployment Metadata Service (DMS) deployment records, keyed by deployment ID. + Deployments map[string]*sdkbundle.Deployment + // Branches and endpoints that the server provisioned implicitly together // with their parent (e.g. the production branch on a new project, or the // primary endpoint on a new branch). The real backend rejects independent @@ -313,6 +317,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresCatalogs: map[string]postgres.Catalog{}, PostgresSyncedTables: map[string]postgres.SyncedTable{}, PostgresOperations: map[string]postgres.Operation{}, + Deployments: map[string]*sdkbundle.Deployment{}, postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 659f6e010ff..330fedd5750 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -222,6 +222,26 @@ func AddDefaultHandlers(server *Server) { return req.Workspace.JobsCreate(req) }) + // Deployment Metadata Service (DMS) endpoints. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.BundleCreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.BundleGetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.BundleDeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.BundleCreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.BundleCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.BundleHeartbeat() + }) + server.Handle("POST", "/api/2.2/jobs/delete", func(req Request) any { var request jobs.DeleteJob if err := json.Unmarshal(req.Body, &request); err != nil {