Skip to content

feat: add streaming request/response support for async API#174

Open
puzza007 wants to merge 3 commits intomasterfrom
feat/streaming-response
Open

feat: add streaming request/response support for async API#174
puzza007 wants to merge 3 commits intomasterfrom
feat/streaming-response

Conversation

@puzza007
Copy link
Copy Markdown
Owner

Adds streaming support in both directions for the async API, along with an opaque handle type and metrics parity with sync requests.

Streaming responses let callers receive body data incrementally via {katipo_data, Ref, Chunk} messages by passing stream => true. Streaming request bodies let callers upload data incrementally via send_body/2 and finish_body/1 by passing stream_body => true. Both can be combined for full bidirectional streaming.

The async API now returns an opaque async_handle() instead of a raw reference. The handle embeds the worker name for body chunk routing and a monitor ref for worker death detection in await/1,2. Callers use katipo:ref(Handle) to extract the reference for manual receive patterns.

Async responses now include metrics in the response map, matching the data sync requests get. The worker monitors the caller process for streaming body requests and auto-finishes the upload if the caller dies, preventing worker hangs.

The C port gains upload_read_cb using curl's CURLOPT_READFUNCTION with pause/unpause, body message parsing in erl_input via atom-type peeking, and a ConnInfo linked list for ref-based lookup. Late chunks after body_done are silently dropped.

Tested with 28 unit tests covering happy paths, edge cases (double finish, send after finish, empty body, large body, bidirectional streaming), failure modes (worker death, caller death, timeouts), and a PropEr statem suite that generates random sequences of uploads, gets, chunks, finishes, timeouts, and recovery across the shared pool, verifying body content ordering and metrics on every response.

puzza007 and others added 3 commits March 21, 2026 07:29
Modify the C port to optionally stream response data as it arrives
rather than buffering the entire response in memory. When stream =>
true is passed in async request opts, the port sends three message
types:

  {headers, From, {Status, Headers, CookieJar}}  - status + headers
  {chunk, From, Data}                             - body chunk
  {done, From, Metrics}                           - transfer complete

The Erlang worker forwards these as:

  {katipo_response, Ref, {status, Status, Headers, CookieJar}}
  {katipo_data, Ref, Chunk}
  {katipo_done, Ref}

Non-streaming async and sync paths are unchanged. Errors use the
existing {katipo_error, Ref, ErrorMap} format in both modes.

Passing stream => true to the sync API (req/2, get/2, etc.) returns
a bad_opts error since streaming requires the async message-based API.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Introduce stream_body option for incrementally uploading request bodies
via send_body/2 and finish_body/1. Uses curl's CURLOPT_READFUNCTION
with pause/unpause to integrate with the existing libevent loop.

Replace raw reference() return from async_* functions with an opaque
async_handle() that embeds the worker name (for body chunk routing)
and a monitor ref (for worker death detection in await/1,2).

The worker monitors the caller process for streaming body requests
and auto-finishes the upload if the caller dies, preventing hangs.

Also includes metrics in async responses, matching sync behavior.
Property-based stateful tests exercising random sequences of
streaming uploads, non-streaming gets, timeouts, and recovery.
The model tracks accumulated chunks per handle and verifies the
server receives them in order with correct content. Covers
interleaved streaming/non-streaming requests on the same pool,
timeout-then-recovery (await before finish, then finish and
re-await), and concurrent upload isolation across workers.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds bidirectional streaming support to Katipo’s async HTTP API, including an opaque async handle type, streaming request body upload helpers, and expanded test coverage (CT + PropEr) to validate streaming behavior.

Changes:

  • Introduces async request APIs (async_get/post/put/..., async_req/2) and an opaque async_handle() with ref/1, plus await/1,2.
  • Adds streaming response support (stream => true) and streaming request-body upload support (stream_body => true with send_body/2, finish_body/1).
  • Extends the C port to support streaming output and incremental upload via a read callback + pause/unpause, and adds extensive CT/PropEr tests.

Reviewed changes

Copilot reviewed 4 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/katipo.erl Implements async handle API, await/ref helpers, streaming response routing, and streaming upload plumbing.
c_src/katipo.c Adds streaming response emission, upload read callback, and message parsing for body chunk/done control.
test/katipo_SUITE.erl Adds CT coverage for async API, streaming responses, streaming request bodies, and failure modes.
test/katipo_stream_body_statem.erl Adds a PropEr statem suite for randomized streaming upload sequences and invariants.
rebar.config Adds PropEr as a test dependency.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/katipo.erl
Comment on lines +748 to +761
Opts2 = maps:remove(reply_to, Opts),
case process_opts(Opts2) of
{ok, #req{url = undefined}} ->
{error, error_map(bad_opts, <<"[{url,undefined}]">>)};
{ok, Req} ->
UserRef = make_ref(),
Timeout = ?MODULE:get_timeout(Req),
Req2 = Req#req{timeout = Timeout},
WorkerName = wpool_pool:random_worker(PoolName),
MonRef = erlang:monitor(process, WorkerName),
gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}),
{ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}};
{error, _} = Error ->
Error
Copy link

Copilot AI Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async_req/2 strips reply_to from the opts map before calling process_opts/1, which means reply_to is never validated (despite having an opt(reply_to, ...) clause). If a non-pid is passed, erlang:monitor/2 and/or message sending can crash. Validate reply_to explicitly before process_opts/1 (or keep it in the map for option validation) and return bad_opts on invalid values.

Suggested change
Opts2 = maps:remove(reply_to, Opts),
case process_opts(Opts2) of
{ok, #req{url = undefined}} ->
{error, error_map(bad_opts, <<"[{url,undefined}]">>)};
{ok, Req} ->
UserRef = make_ref(),
Timeout = ?MODULE:get_timeout(Req),
Req2 = Req#req{timeout = Timeout},
WorkerName = wpool_pool:random_worker(PoolName),
MonRef = erlang:monitor(process, WorkerName),
gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}),
{ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}};
{error, _} = Error ->
Error
case is_pid(ReplyTo) of
false ->
{error, error_map(bad_opts, <<"[{reply_to,invalid}]">>)};
true ->
Opts2 = maps:remove(reply_to, Opts),
case process_opts(Opts2) of
{ok, #req{url = undefined}} ->
{error, error_map(bad_opts, <<"[{url,undefined}]">>)};
{ok, Req} ->
UserRef = make_ref(),
Timeout = ?MODULE:get_timeout(Req),
Req2 = Req#req{timeout = Timeout},
WorkerName = wpool_pool:random_worker(PoolName),
MonRef = erlang:monitor(process, WorkerName),
gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}),
{ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}};
{error, _} = Error ->
Error
end

Copilot uses AI. Check for mistakes.
Comment thread src/katipo.erl
Comment on lines +753 to +760
UserRef = make_ref(),
Timeout = ?MODULE:get_timeout(Req),
Req2 = Req#req{timeout = Timeout},
WorkerName = wpool_pool:random_worker(PoolName),
MonRef = erlang:monitor(process, WorkerName),
gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}),
{ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}};
{error, _} = Error ->
Copy link

Copilot AI Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async_req/2 creates a monitor (MonRef) and stores it inside an opaque handle. If callers use the documented manual receive pattern via katipo:ref/1 (instead of await/1,2), they have no way to demonitor, so monitors (and later 'DOWN' messages) will leak until the worker dies. Consider creating the worker monitor inside await/2 (so it always gets cleaned up), or provide an explicit API to cancel/demonitor handles.

Copilot uses AI. Check for mistakes.
Comment thread src/katipo.erl
Comment on lines +735 to +742
The response is delivered as a `{katipo_response, Ref, ResponseMap}` or
`{katipo_error, Ref, ErrorMap}` message to the process specified by the `reply_to`
option (defaults to `self()`). Use `ref/1` to extract the `Ref` for matching.

Use `await/1,2` to block until the response arrives.

For streaming request bodies, pass `stream_body => true` and use
`send_body/2` and `finish_body/1` to send the body incrementally.
Copy link

Copilot AI Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async_req/2 doc states responses are delivered as {katipo_response, Ref, ResponseMap} / {katipo_error, Ref, ErrorMap} and suggests await/1,2 for waiting. In streaming mode (stream => true), the implementation emits {katipo_response, Ref, {status,...}}, {katipo_data,...}, and {katipo_done,...}, and await/1,2 will never complete. Update the doc to describe the streaming message protocol and clarify that await/1,2 is only for non-streaming responses (or extend await/1,2 to support streaming completion).

Suggested change
The response is delivered as a `{katipo_response, Ref, ResponseMap}` or
`{katipo_error, Ref, ErrorMap}` message to the process specified by the `reply_to`
option (defaults to `self()`). Use `ref/1` to extract the `Ref` for matching.
Use `await/1,2` to block until the response arrives.
For streaming request bodies, pass `stream_body => true` and use
`send_body/2` and `finish_body/1` to send the body incrementally.
### Non-streaming responses
When the `stream` option is not enabled (the default), the response is delivered
as a single `{katipo_response, Ref, ResponseMap}` or
`{katipo_error, Ref, ErrorMap}` message to the process specified by the
`reply_to` option (defaults to `self()`). Use `ref/1` to extract the `Ref`
for matching.
In this non-streaming mode, you can use `await/1,2` to block until the response
arrives.
### Streaming responses
When `stream => true` is set in the request options, the response is delivered
as a sequence of messages:
* `{katipo_response, Ref, {status, StatusCode, Headers}}` – initial status
and headers,
* zero or more `{katipo_data, Ref, Chunk}` messagesresponse body chunks,
* a final `{katipo_done, Ref, ResponseMap}` or
`{katipo_error, Ref, ErrorMap}` messagecompletion or error.
In streaming mode, you must handle these messages explicitly (for example with
a `receive` loop); `await/1,2` does not complete for streaming responses.
### Streaming request bodies
For streaming request bodies, pass `stream_body => true` and use `send_body/2`
and `finish_body/1` to send the body incrementally. This affects how the
request body is sent but does not change the response delivery semantics
described above.

Copilot uses AI. Check for mistakes.
Comment thread src/katipo.erl
Comment on lines +1074 to +1079
handle_stream_done(From, _Metrics, Reqs,
State = #state{user_refs = URefs, caller_monitors = CMs}) ->
{URefs2, CMs2} = case maps:find(From, Reqs) of
{ok, {Tref, {async, ReplyTo, UserRef}}} ->
_ = erlang:cancel_timer(Tref),
ReplyTo ! {katipo_done, UserRef},
Copy link

Copilot AI Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_stream_done/5 receives Metrics from the port message ({done, From, Metrics}) but ignores it and only sends {katipo_done, UserRef}. This drops metrics that were already computed/encoded in the C port and prevents callers from getting timing data for streamed responses. Either forward the metrics to Erlang callers (e.g., include them in the done message) or stop sending them from the port to avoid wasted work.

Suggested change
handle_stream_done(From, _Metrics, Reqs,
State = #state{user_refs = URefs, caller_monitors = CMs}) ->
{URefs2, CMs2} = case maps:find(From, Reqs) of
{ok, {Tref, {async, ReplyTo, UserRef}}} ->
_ = erlang:cancel_timer(Tref),
ReplyTo ! {katipo_done, UserRef},
handle_stream_done(From, Metrics, Reqs,
State = #state{user_refs = URefs, caller_monitors = CMs}) ->
{URefs2, CMs2} = case maps:find(From, Reqs) of
{ok, {Tref, {async, ReplyTo, UserRef}}} ->
_ = erlang:cancel_timer(Tref),
ReplyTo ! {katipo_done, UserRef, Metrics},

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants