diff --git a/acceptance/bundle/dms/enable-after-deploy/databricks.yml b/acceptance/bundle/dms/enable-after-deploy/databricks.yml new file mode 100644 index 00000000000..cd8ff90b266 --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: dms-enable-after-deploy + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/enable-after-deploy/out.test.toml b/acceptance/bundle/dms/enable-after-deploy/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/enable-after-deploy/output.txt b/acceptance/bundle/dms/enable-after-deploy/output.txt new file mode 100644 index 00000000000..0d9addaa6ee --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/output.txt @@ -0,0 +1,88 @@ + +=== Deploy without the feature: no DMS calls are made +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Enable experimental.record_deployment_history +=== Delete local cache (.databricks) so the next deploy cannot rely on it +=== Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-enable-after-deploy/default + +Deleting files... +Destroy complete! diff --git a/acceptance/bundle/dms/enable-after-deploy/script b/acceptance/bundle/dms/enable-after-deploy/script new file mode 100644 index 00000000000..c17b3b859dc --- /dev/null +++ b/acceptance/bundle/dms/enable-after-deploy/script @@ -0,0 +1,25 @@ +cleanup() { + trace $CLI bundle destroy --auto-approve + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy without the feature: no DMS calls are made" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock + +title "Enable experimental.record_deployment_history" +cat >> databricks.yml <<'YAML' + +experimental: + record_deployment_history: true +YAML + +title "Delete local cache (.databricks) so the next deploy cannot rely on it" +rm -rf .databricks + +title "Next deploy creates a DMS deployment + version 1 (lineage recovered from remote state, not local cache)" +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/record/databricks.yml b/acceptance/bundle/dms/record/databricks.yml new file mode 100644 index 00000000000..b20e6274310 --- /dev/null +++ b/acceptance/bundle/dms/record/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: dms-record + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/record/out.test.toml b/acceptance/bundle/dms/record/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/record/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/record/output.txt b/acceptance/bundle/dms/record/output.txt new file mode 100644 index 00000000000..ea51648031d --- /dev/null +++ b/acceptance/bundle/dms/record/output.txt @@ -0,0 +1,148 @@ + +=== Deploy: a DMS deployment + version are recorded while the file lock is held +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== DMS API calls made during deploy (create deployment, create + complete version) +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +=== The workspace-filesystem lock is applied alongside DMS (deploy.lock written) +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment) +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-record/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DEPLOY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} + +=== Destroy: a DMS version is recorded while the file lock is held +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-record/default + +Deleting files... +Destroy complete! + +>>> print_requests.py //api/2.0/bundle --keep +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "3" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "target_name": "default", + "version_type": "VERSION_TYPE_DESTROY" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/complete", + "body": { + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "DELETE", + "path": "/api/2.0/bundle/deployments/[UUID]" +} + +>>> print_requests.py //deploy.lock +{ + "method": "POST", + "path": "/api/2.0/workspace-files/import-file/Workspace/Users/[USERNAME]/.bundle/dms-record/default/state/deploy.lock", + "q": { + "overwrite": "false" + }, + "body": { + "ID": "[UUID]", + "AcquisitionTime": "[TIMESTAMP]", + "IsForced": false, + "User": "[USERNAME]" + } +} diff --git a/acceptance/bundle/dms/record/script b/acceptance/bundle/dms/record/script new file mode 100644 index 00000000000..afb9b05bdc3 --- /dev/null +++ b/acceptance/bundle/dms/record/script @@ -0,0 +1,23 @@ +cleanup() { + title "Destroy: a DMS version is recorded while the file lock is held" + trace $CLI bundle destroy --auto-approve + trace print_requests.py //api/2.0/bundle --keep + trace print_requests.py //deploy.lock + rm -f out.requests.txt +} +trap cleanup EXIT + +title "Deploy: a DMS deployment + version are recorded while the file lock is held" +trace $CLI bundle deploy + +title "DMS API calls made during deploy (create deployment, create + complete version)" +trace print_requests.py //api/2.0/bundle --keep + +title "The workspace-filesystem lock is applied alongside DMS (deploy.lock written)" +trace print_requests.py //deploy.lock + +title "Redeploy after deleting local cache (.databricks): the lineage is recovered from remote state, so the same deployment is reused and the version increments (no new CreateDeployment)" +rm -rf .databricks +trace $CLI bundle deploy +trace print_requests.py //api/2.0/bundle --keep +trace print_requests.py //deploy.lock diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 00000000000..84b939a7da2 --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,5 @@ +# Deployment Metadata Service (DMS) recording is only meaningful in the direct +# engine, where the deployment ID is derived from the state lineage. +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] + +RecordRequests = true diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 00000000000..1c5159da5a2 --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,234 @@ +package lock + +import ( + "context" + "errors" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// The server expires a version's lease if it does not receive a heartbeat +// within a 2-minute TTL; we heartbeat well inside that window. +const defaultHeartbeatInterval = 30 * time.Second + +// DeploymentStatus indicates whether the deployment operation succeeded or failed. +type DeploymentStatus int + +const ( + DeploymentSuccess DeploymentStatus = iota + DeploymentFailure +) + +// DeploymentVersionRecorder records each deploy/destroy as a version with the +// Deployment Metadata Service (DMS). It runs alongside the workspace-filesystem +// lock (lock.Acquire / lock.Release): a version is created after the lock is +// acquired and completed before it is released. +// +// Recording is gated by experimental.record_deployment_history; when disabled, +// CreateVersion and CompleteVersion are no-ops. The deployment ID is the state +// lineage (resources.json), so a bundle deployment maps one-to-one to a DMS +// deployment record. +type DeploymentVersionRecorder struct { + b *bundle.Bundle + goal Goal + enabled bool + + // populated by CreateVersion + svc sdkbundle.BundleInterface + deploymentID string + versionNum int64 + stopHeartbeat context.CancelFunc +} + +// NewDeploymentVersionRecorder returns a recorder for the given goal. The +// returned recorder is a no-op unless experimental.record_deployment_history +// is set. +func NewDeploymentVersionRecorder(b *bundle.Bundle, goal Goal) *DeploymentVersionRecorder { + enabled := b.Config.Experimental != nil && b.Config.Experimental.RecordDeploymentHistory + return &DeploymentVersionRecorder{b: b, goal: goal, enabled: enabled} +} + +// CreateVersion registers a new deployment version with DMS, claiming it for the +// duration of the deployment. No-op when recording is disabled. +func (r *DeploymentVersionRecorder) CreateVersion(ctx context.Context) error { + if !r.enabled { + return nil + } + + versionType, ok := goalToVersionType(r.goal) + if !ok { + return fmt.Errorf("%s is not supported with the deployment metadata service", r.goal) + } + + r.svc = r.b.WorkspaceClient(ctx).Bundle + + // The deployment ID is the state lineage. GetOrInitLineage generates one on + // the first deploy and stores it so the deploy persists the same value. + r.deploymentID = r.b.DeploymentBundle.StateDB.GetOrInitLineage() + + versionID, err := createDeploymentVersion(ctx, r.b, r.svc, r.deploymentID, versionType) + if err != nil { + return err + } + + versionNum, err := strconv.ParseInt(versionID, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse version ID %q: %w", versionID, err) + } + r.versionNum = versionNum + r.stopHeartbeat = startHeartbeat(ctx, r.svc, r.deploymentID, versionID) + return nil +} + +// CompleteVersion finalizes the version created by CreateVersion. No-op when +// recording is disabled or no version was created. +func (r *DeploymentVersionRecorder) CompleteVersion(ctx context.Context, status DeploymentStatus) error { + if !r.enabled || r.svc == nil { + return nil + } + + if r.stopHeartbeat != nil { + r.stopHeartbeat() + } + + versionIDStr := strconv.FormatInt(r.versionNum, 10) + versionName := fmt.Sprintf("deployments/%s/versions/%s", r.deploymentID, versionIDStr) + + reason := sdkbundle.VersionCompleteVersionCompleteSuccess + if status == DeploymentFailure { + reason = sdkbundle.VersionCompleteVersionCompleteFailure + } + + _, err := r.svc.CompleteVersion(ctx, sdkbundle.CompleteVersionRequest{ + Name: versionName, + CompletionReason: reason, + }) + if err != nil { + return err + } + log.Infof(ctx, "Completed deployment version: deployment=%s version=%s reason=%s", r.deploymentID, versionIDStr, reason) + + // For destroy operations, delete the deployment record after the version + // completes successfully. + if status == DeploymentSuccess && r.goal == GoalDestroy { + err = r.svc.DeleteDeployment(ctx, sdkbundle.DeleteDeploymentRequest{ + Name: "deployments/" + r.deploymentID, + }) + if err != nil { + return fmt.Errorf("failed to delete deployment: %w", err) + } + } + + return nil +} + +// createDeploymentVersion ensures the deployment record exists, then creates a +// new version. The deployment ID is the state lineage: we GetDeployment first +// and only CreateDeployment when it does not exist yet. +func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, deploymentID string, versionType sdkbundle.VersionType) (versionID string, err error) { + dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ + Name: "deployments/" + deploymentID, + }) + switch { + case errors.Is(getErr, apierr.ErrNotFound): + // Fresh deployment: create the record and start at version 1. + _, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{ + DeploymentId: deploymentID, + Deployment: sdkbundle.Deployment{ + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil { + return "", fmt.Errorf("failed to create deployment: %w", createErr) + } + versionID = "1" + case getErr != nil: + return "", fmt.Errorf("failed to get deployment: %w", getErr) + default: + // Existing deployment: increment the last version to get the next one. + lastVersion, parseErr := strconv.ParseInt(dep.LastVersionId, 10, 64) + if parseErr != nil { + return "", fmt.Errorf("failed to parse last_version_id %q: %w", dep.LastVersionId, parseErr) + } + versionID = strconv.FormatInt(lastVersion+1, 10) + } + + // The server validates that versionID equals last_version_id + 1 and returns + // ABORTED otherwise (e.g. a concurrent deploy already created this version). + version, versionErr := svc.CreateVersion(ctx, sdkbundle.CreateVersionRequest{ + Parent: "deployments/" + deploymentID, + VersionId: versionID, + Version: sdkbundle.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, + }) + if versionErr != nil { + return "", fmt.Errorf("failed to create deployment version: %w", versionErr) + } + + log.Infof(ctx, "Created deployment version: deployment=%s version=%s", deploymentID, version.VersionId) + return versionID, nil +} + +// startHeartbeat starts a background goroutine that sends heartbeats to keep +// the deployment version's lease alive. Returns a cancel function to stop it. +func startHeartbeat(ctx context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + versionName := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := svc.Heartbeat(ctx, sdkbundle.HeartbeatRequest{Name: versionName}) + if err != nil { + // A 409 ABORTED is expected if the version was completed + // between the ticker firing and the heartbeat. + if isAbortedErr(err) { + log.Debugf(ctx, "Heartbeat stopped: version already completed") + return + } + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + } else { + log.Debugf(ctx, "Deployment heartbeat sent: deployment=%s version=%s", deploymentID, versionID) + } + } + } + }() + + return cancel +} + +// isAbortedErr reports whether err is an HTTP 409 ABORTED from the DMS API. +func isAbortedErr(err error) bool { + apiErr, ok := errors.AsType[*apierr.APIError](err) + return ok && apiErr.StatusCode == http.StatusConflict && apiErr.ErrorCode == "ABORTED" +} + +// goalToVersionType maps a deployment goal to a DMS VersionType. +// Returns false for goals not supported by the metadata service (bind/unbind). +func goalToVersionType(goal Goal) (sdkbundle.VersionType, bool) { + switch goal { + case GoalDeploy: + return sdkbundle.VersionTypeVersionTypeDeploy, true + case GoalDestroy: + return sdkbundle.VersionTypeVersionTypeDestroy, true + default: + return "", false + } +} diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index 5b2a70adbb3..b87166df60b 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -147,6 +147,26 @@ func (db *DeploymentState) GetResourceID(key string) string { return db.stateIDs[key] } +// GetOrInitLineage returns the state lineage, generating and storing a new one +// in memory if the state does not have a lineage yet (fresh deployment). Storing +// it in Data.Lineage means a subsequent UpgradeToWrite persists the same value, +// so the lineage observed here matches the one the deployment engine later writes +// to the state file. +// +// This lets DMS use the same lineage value as the deployment engine for the +// deployment ID. It is necessary because DMS creates the version/deployment at +// lock time, which is before the engine assigns the lineage at plan-apply time; +// without seeding it here the two would diverge on the first deploy. +func (db *DeploymentState) GetOrInitLineage() string { + db.mu.Lock() + defer db.mu.Unlock() + + if db.Data.Lineage == "" { + db.Data.Lineage = uuid.New().String() + } + return db.Data.Lineage +} + type ( // If true, then Open reads the WAL and merges it in the state. If false, and WAL is present, Open returns an error. WithRecovery bool diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 15546880b9a..b3c6f3c682d 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -136,10 +136,23 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand return } - // lock is acquired here + // lock is acquired here. Record a DMS deployment version while the lock is + // held (no-op unless experimental.record_deployment_history is set). + recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDeploy) defer func() { + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := recorder.CompleteVersion(ctx, status); err != nil { + logdiag.LogError(ctx, err) + } bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) }() + if err := recorder.CreateVersion(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } uploadLibraries(ctx, b, libs) if logdiag.HasError(ctx) { diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 95eec600dc2..7a078f6ed6a 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -125,9 +125,23 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { return } + // Record a DMS deployment version while the lock is held (no-op unless + // experimental.record_deployment_history is set). + recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDestroy) defer func() { + status := lock.DeploymentSuccess + if logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := recorder.CompleteVersion(ctx, status); err != nil { + logdiag.LogError(ctx, err) + } bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) }() + if err := recorder.CreateVersion(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } if !engine.IsDirect() { bundle.ApplySeqContext(ctx, b, diff --git a/libs/testserver/bundle.go b/libs/testserver/bundle.go new file mode 100644 index 00000000000..a59da9cce13 --- /dev/null +++ b/libs/testserver/bundle.go @@ -0,0 +1,106 @@ +package testserver + +import ( + "encoding/json" + "strconv" + + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" +) + +// Handlers for the Deployment Metadata Service (DMS) API under /api/2.0/bundle. +// State is kept in FakeWorkspace.Deployments, keyed by deployment ID. + +func (s *FakeWorkspace) BundleCreateDeployment(req Request) Response { + deploymentID := req.URL.Query().Get("deployment_id") + + var dep sdkbundle.Deployment + if err := json.Unmarshal(req.Body, &dep); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep.Name = "deployments/" + deploymentID + dep.Status = sdkbundle.DeploymentStatusDeploymentStatusActive + s.Deployments[deploymentID] = &dep + return Response{Body: dep} +} + +func (s *FakeWorkspace) BundleGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{ + "error_code": "RESOURCE_DOES_NOT_EXIST", + "message": "deployment " + deploymentID + " does not exist", + }, + } + } + return Response{Body: *dep} +} + +func (s *FakeWorkspace) BundleDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + delete(s.Deployments, deploymentID) + return Response{Body: map[string]any{}} +} + +func (s *FakeWorkspace) BundleCreateVersion(req Request, deploymentID string) Response { + versionID := req.URL.Query().Get("version_id") + + var version sdkbundle.Version + if err := json.Unmarshal(req.Body, &version); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"message": err.Error()}} + } + + defer s.LockUnlock()() + + dep, ok := s.Deployments[deploymentID] + if !ok { + return Response{ + StatusCode: 404, + Body: map[string]string{"error_code": "RESOURCE_DOES_NOT_EXIST", "message": "deployment does not exist"}, + } + } + + // Mirror the server-side optimistic concurrency check: the new version must + // be exactly last_version_id + 1. + last, _ := strconv.ParseInt(dep.LastVersionId, 10, 64) + want := strconv.FormatInt(last+1, 10) + if dep.LastVersionId == "" { + want = "1" + } + if versionID != want { + return Response{ + StatusCode: 409, + Headers: map[string][]string{"Content-Type": {"application/json"}}, + Body: map[string]string{"error_code": "ABORTED", "message": "expected version " + want + ", got " + versionID}, + } + } + + dep.LastVersionId = versionID + version.Name = "deployments/" + deploymentID + "/versions/" + versionID + version.VersionId = versionID + version.Status = sdkbundle.VersionStatusVersionStatusInProgress + return Response{Body: version} +} + +func (s *FakeWorkspace) BundleCompleteVersion(req Request, deploymentID, versionID string) Response { + var completeReq sdkbundle.CompleteVersionRequest + _ = json.Unmarshal(req.Body, &completeReq) + + return Response{Body: sdkbundle.Version{ + Name: "deployments/" + deploymentID + "/versions/" + versionID, + VersionId: versionID, + Status: sdkbundle.VersionStatusVersionStatusCompleted, + CompletionReason: completeReq.CompletionReason, + }} +} + +func (s *FakeWorkspace) BundleHeartbeat() Response { + return Response{Body: sdkbundle.HeartbeatResponse{}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index ff70f6b0505..3f3b3fa5bfe 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/databricks/databricks-sdk-go/service/apps" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -176,6 +177,9 @@ type FakeWorkspace struct { PostgresSyncedTables map[string]postgres.SyncedTable PostgresOperations map[string]postgres.Operation + // Deployment Metadata Service (DMS) deployment records, keyed by deployment ID. + Deployments map[string]*sdkbundle.Deployment + // Branches and endpoints that the server provisioned implicitly together // with their parent (e.g. the production branch on a new project, or the // primary endpoint on a new branch). The real backend rejects independent @@ -313,6 +317,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresCatalogs: map[string]postgres.Catalog{}, PostgresSyncedTables: map[string]postgres.SyncedTable{}, PostgresOperations: map[string]postgres.Operation{}, + Deployments: map[string]*sdkbundle.Deployment{}, postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, clusterVenvs: map[string]*clusterEnv{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 659f6e010ff..330fedd5750 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -222,6 +222,26 @@ func AddDefaultHandlers(server *Server) { return req.Workspace.JobsCreate(req) }) + // Deployment Metadata Service (DMS) endpoints. + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.BundleCreateDeployment(req) + }) + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.BundleGetDeployment(req.Vars["deployment_id"]) + }) + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.BundleDeleteDeployment(req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.BundleCreateVersion(req, req.Vars["deployment_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.BundleCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.BundleHeartbeat() + }) + server.Handle("POST", "/api/2.2/jobs/delete", func(req Request) any { var request jobs.DeleteJob if err := json.Unmarshal(req.Body, &request); err != nil {