Skip to content

add atlas stream processing commands#3045

Draft
nickpoindexter wants to merge 1 commit into
mongodb:masterfrom
nickpoindexter:add_asp_commands
Draft

add atlas stream processing commands#3045
nickpoindexter wants to merge 1 commit into
mongodb:masterfrom
nickpoindexter:add_asp_commands

Conversation

@nickpoindexter
Copy link
Copy Markdown

Summary

Adds first-class driver support for Atlas Stream Processing (ASP), implementing the ASP driver spec. Users currently have to drop down to
Mongo::Client#database('admin').command(...) against a workspace endpoint; this PR adds a dedicated client / handles layer matching what's already shipped for PHP and Rust.

What's new

Public API (lib/mongo/stream_processing/)

  • Mongo::StreamProcessing::Client — workspace-scoped client distinct from Mongo::Client. Validates the workspace URI (atlas-stream-*.<region>.a.query.mongodb.net and the .mongodb-<env>.net staging variants), enforces
    TLS, defaults auth_source to 'admin'. Static Client.workspace_uri?(uri) exposed for callers that want to gate before connecting.
  • Mongo::StreamProcessing::Processors#create(name, pipeline, **opts), #get(name), #get_info(name) for managing processors in a workspace.
  • Mongo::StreamProcessing::Processor#start(**opts), #stop, #drop, #stats(**opts), #samples(**opts) for a named processor.
  • Mongo::StreamProcessing::ProcessorInfo — typed accessor over the getStreamProcessor response. Fields the spec marks as Optional (e.g. id, pipeline_version) are nullable.
  • Mongo::StreamProcessing::SamplesResult — result of #samples, exposes cursor_id, documents, exhausted?.

Wire commands
All 8 ASP commands routed through Database#command against the admin database: createStreamProcessor, startStreamProcessor, stopStreamProcessor, dropStreamProcessor, getStreamProcessor, getStreamProcessorStats,
startSampleStreamProcessor, getMoreSampleStreamProcessor.

Notable spec / server alignment

  • startAfter (in Processor#start options) is intentionally not serialized — the spec marks it RESERVED for future use and explicitly forbids drivers from sending it.
  • Dev-server response shape deviations are accommodated, matching the workarounds in the PHP and Rust PRs (and the Python POC):
    • Processors#get_info unwraps a top-level result wrapper when the server returns { ok: 1, result: { … } }.
    • Processor#samples accepts both nextBatch (spec) and messages (current dev server).
    • ProcessorInfo#id and #pipeline_version return nil when the server omits them.
  • #samples is a single-command dispatch: absent / zero :cursor_idstartSampleStreamProcessor (returns SamplesResult with cursor id, empty docs); non-zero → getMoreSampleStreamProcessor. Callers stop when
    cursor_id == 0 (or use the #exhausted? predicate).
  • State strings returned as plain strings, not symbols or constants — per the spec, drivers MUST surface unknown state values as-is rather than mapping to a closed set.
  • Internal-only fields (tenantID, projectId, processorId) are not surfaced in the public API.

Test plan

  • bundle exec rubocop lib/mongo/stream_processing*.rb spec/mongo/stream_processing/*.rb spec/integration/stream_processing_spec.rb examples/stream_processing.rb — clean
  • bundle exec rspec spec/mongo/stream_processing/ — 29/29 unit examples pass (URI detection for prod / staging / creds-and-port / case-insensitivity / non-string / four reject cases; constructor rejection paths; full
    ProcessorInfo getter coverage incl. defaults and error fields; SamplesResult predicates)
  • bundle exec rspec spec/integration/stream_processing_spec.rb — self-skips cleanly without MONGODB_STREAM_PROCESSING_URI (1 pending, 0 failures)
  • MONGODB_STREAM_PROCESSING_URI=… bundle exec rspec spec/integration/stream_processing_spec.rb — full lifecycle test against a real workspace endpoint. Needs an Evergreen variant configured with workspace credentials
    before it actually exercises in CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant