feat: add streaming request/response support for async API#174
feat: add streaming request/response support for async API#174
Conversation
Modify the C port to optionally stream response data as it arrives
rather than buffering the entire response in memory. When stream =>
true is passed in async request opts, the port sends three message
types:
{headers, From, {Status, Headers, CookieJar}} - status + headers
{chunk, From, Data} - body chunk
{done, From, Metrics} - transfer complete
The Erlang worker forwards these as:
{katipo_response, Ref, {status, Status, Headers, CookieJar}}
{katipo_data, Ref, Chunk}
{katipo_done, Ref}
Non-streaming async and sync paths are unchanged. Errors use the
existing {katipo_error, Ref, ErrorMap} format in both modes.
Passing stream => true to the sync API (req/2, get/2, etc.) returns
a bad_opts error since streaming requires the async message-based API.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Introduce stream_body option for incrementally uploading request bodies via send_body/2 and finish_body/1. Uses curl's CURLOPT_READFUNCTION with pause/unpause to integrate with the existing libevent loop. Replace raw reference() return from async_* functions with an opaque async_handle() that embeds the worker name (for body chunk routing) and a monitor ref (for worker death detection in await/1,2). The worker monitors the caller process for streaming body requests and auto-finishes the upload if the caller dies, preventing hangs. Also includes metrics in async responses, matching sync behavior.
Property-based stateful tests exercising random sequences of streaming uploads, non-streaming gets, timeouts, and recovery. The model tracks accumulated chunks per handle and verifies the server receives them in order with correct content. Covers interleaved streaming/non-streaming requests on the same pool, timeout-then-recovery (await before finish, then finish and re-await), and concurrent upload isolation across workers.
There was a problem hiding this comment.
Pull request overview
Adds bidirectional streaming support to Katipo’s async HTTP API, including an opaque async handle type, streaming request body upload helpers, and expanded test coverage (CT + PropEr) to validate streaming behavior.
Changes:
- Introduces async request APIs (
async_get/post/put/...,async_req/2) and an opaqueasync_handle()withref/1, plusawait/1,2. - Adds streaming response support (
stream => true) and streaming request-body upload support (stream_body => truewithsend_body/2,finish_body/1). - Extends the C port to support streaming output and incremental upload via a read callback + pause/unpause, and adds extensive CT/PropEr tests.
Reviewed changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/katipo.erl |
Implements async handle API, await/ref helpers, streaming response routing, and streaming upload plumbing. |
c_src/katipo.c |
Adds streaming response emission, upload read callback, and message parsing for body chunk/done control. |
test/katipo_SUITE.erl |
Adds CT coverage for async API, streaming responses, streaming request bodies, and failure modes. |
test/katipo_stream_body_statem.erl |
Adds a PropEr statem suite for randomized streaming upload sequences and invariants. |
rebar.config |
Adds PropEr as a test dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Opts2 = maps:remove(reply_to, Opts), | ||
| case process_opts(Opts2) of | ||
| {ok, #req{url = undefined}} -> | ||
| {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; | ||
| {ok, Req} -> | ||
| UserRef = make_ref(), | ||
| Timeout = ?MODULE:get_timeout(Req), | ||
| Req2 = Req#req{timeout = Timeout}, | ||
| WorkerName = wpool_pool:random_worker(PoolName), | ||
| MonRef = erlang:monitor(process, WorkerName), | ||
| gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}), | ||
| {ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}}; | ||
| {error, _} = Error -> | ||
| Error |
There was a problem hiding this comment.
async_req/2 strips reply_to from the opts map before calling process_opts/1, which means reply_to is never validated (despite having an opt(reply_to, ...) clause). If a non-pid is passed, erlang:monitor/2 and/or message sending can crash. Validate reply_to explicitly before process_opts/1 (or keep it in the map for option validation) and return bad_opts on invalid values.
| Opts2 = maps:remove(reply_to, Opts), | |
| case process_opts(Opts2) of | |
| {ok, #req{url = undefined}} -> | |
| {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; | |
| {ok, Req} -> | |
| UserRef = make_ref(), | |
| Timeout = ?MODULE:get_timeout(Req), | |
| Req2 = Req#req{timeout = Timeout}, | |
| WorkerName = wpool_pool:random_worker(PoolName), | |
| MonRef = erlang:monitor(process, WorkerName), | |
| gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}), | |
| {ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}}; | |
| {error, _} = Error -> | |
| Error | |
| case is_pid(ReplyTo) of | |
| false -> | |
| {error, error_map(bad_opts, <<"[{reply_to,invalid}]">>)}; | |
| true -> | |
| Opts2 = maps:remove(reply_to, Opts), | |
| case process_opts(Opts2) of | |
| {ok, #req{url = undefined}} -> | |
| {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; | |
| {ok, Req} -> | |
| UserRef = make_ref(), | |
| Timeout = ?MODULE:get_timeout(Req), | |
| Req2 = Req#req{timeout = Timeout}, | |
| WorkerName = wpool_pool:random_worker(PoolName), | |
| MonRef = erlang:monitor(process, WorkerName), | |
| gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}), | |
| {ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}}; | |
| {error, _} = Error -> | |
| Error | |
| end |
| UserRef = make_ref(), | ||
| Timeout = ?MODULE:get_timeout(Req), | ||
| Req2 = Req#req{timeout = Timeout}, | ||
| WorkerName = wpool_pool:random_worker(PoolName), | ||
| MonRef = erlang:monitor(process, WorkerName), | ||
| gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}), | ||
| {ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}}; | ||
| {error, _} = Error -> |
There was a problem hiding this comment.
async_req/2 creates a monitor (MonRef) and stores it inside an opaque handle. If callers use the documented manual receive pattern via katipo:ref/1 (instead of await/1,2), they have no way to demonitor, so monitors (and later 'DOWN' messages) will leak until the worker dies. Consider creating the worker monitor inside await/2 (so it always gets cleaned up), or provide an explicit API to cancel/demonitor handles.
| The response is delivered as a `{katipo_response, Ref, ResponseMap}` or | ||
| `{katipo_error, Ref, ErrorMap}` message to the process specified by the `reply_to` | ||
| option (defaults to `self()`). Use `ref/1` to extract the `Ref` for matching. | ||
|
|
||
| Use `await/1,2` to block until the response arrives. | ||
|
|
||
| For streaming request bodies, pass `stream_body => true` and use | ||
| `send_body/2` and `finish_body/1` to send the body incrementally. |
There was a problem hiding this comment.
The async_req/2 doc states responses are delivered as {katipo_response, Ref, ResponseMap} / {katipo_error, Ref, ErrorMap} and suggests await/1,2 for waiting. In streaming mode (stream => true), the implementation emits {katipo_response, Ref, {status,...}}, {katipo_data,...}, and {katipo_done,...}, and await/1,2 will never complete. Update the doc to describe the streaming message protocol and clarify that await/1,2 is only for non-streaming responses (or extend await/1,2 to support streaming completion).
| The response is delivered as a `{katipo_response, Ref, ResponseMap}` or | |
| `{katipo_error, Ref, ErrorMap}` message to the process specified by the `reply_to` | |
| option (defaults to `self()`). Use `ref/1` to extract the `Ref` for matching. | |
| Use `await/1,2` to block until the response arrives. | |
| For streaming request bodies, pass `stream_body => true` and use | |
| `send_body/2` and `finish_body/1` to send the body incrementally. | |
| ### Non-streaming responses | |
| When the `stream` option is not enabled (the default), the response is delivered | |
| as a single `{katipo_response, Ref, ResponseMap}` or | |
| `{katipo_error, Ref, ErrorMap}` message to the process specified by the | |
| `reply_to` option (defaults to `self()`). Use `ref/1` to extract the `Ref` | |
| for matching. | |
| In this non-streaming mode, you can use `await/1,2` to block until the response | |
| arrives. | |
| ### Streaming responses | |
| When `stream => true` is set in the request options, the response is delivered | |
| as a sequence of messages: | |
| * `{katipo_response, Ref, {status, StatusCode, Headers}}` – initial status | |
| and headers, | |
| * zero or more `{katipo_data, Ref, Chunk}` messages – response body chunks, | |
| * a final `{katipo_done, Ref, ResponseMap}` or | |
| `{katipo_error, Ref, ErrorMap}` message – completion or error. | |
| In streaming mode, you must handle these messages explicitly (for example with | |
| a `receive` loop); `await/1,2` does not complete for streaming responses. | |
| ### Streaming request bodies | |
| For streaming request bodies, pass `stream_body => true` and use `send_body/2` | |
| and `finish_body/1` to send the body incrementally. This affects how the | |
| request body is sent but does not change the response delivery semantics | |
| described above. |
| handle_stream_done(From, _Metrics, Reqs, | ||
| State = #state{user_refs = URefs, caller_monitors = CMs}) -> | ||
| {URefs2, CMs2} = case maps:find(From, Reqs) of | ||
| {ok, {Tref, {async, ReplyTo, UserRef}}} -> | ||
| _ = erlang:cancel_timer(Tref), | ||
| ReplyTo ! {katipo_done, UserRef}, |
There was a problem hiding this comment.
handle_stream_done/5 receives Metrics from the port message ({done, From, Metrics}) but ignores it and only sends {katipo_done, UserRef}. This drops metrics that were already computed/encoded in the C port and prevents callers from getting timing data for streamed responses. Either forward the metrics to Erlang callers (e.g., include them in the done message) or stop sending them from the port to avoid wasted work.
| handle_stream_done(From, _Metrics, Reqs, | |
| State = #state{user_refs = URefs, caller_monitors = CMs}) -> | |
| {URefs2, CMs2} = case maps:find(From, Reqs) of | |
| {ok, {Tref, {async, ReplyTo, UserRef}}} -> | |
| _ = erlang:cancel_timer(Tref), | |
| ReplyTo ! {katipo_done, UserRef}, | |
| handle_stream_done(From, Metrics, Reqs, | |
| State = #state{user_refs = URefs, caller_monitors = CMs}) -> | |
| {URefs2, CMs2} = case maps:find(From, Reqs) of | |
| {ok, {Tref, {async, ReplyTo, UserRef}}} -> | |
| _ = erlang:cancel_timer(Tref), | |
| ReplyTo ! {katipo_done, UserRef, Metrics}, |
Adds streaming support in both directions for the async API, along with an opaque handle type and metrics parity with sync requests.
Streaming responses let callers receive body data incrementally via
{katipo_data, Ref, Chunk}messages by passingstream => true. Streaming request bodies let callers upload data incrementally viasend_body/2andfinish_body/1by passingstream_body => true. Both can be combined for full bidirectional streaming.The async API now returns an opaque
async_handle()instead of a raw reference. The handle embeds the worker name for body chunk routing and a monitor ref for worker death detection inawait/1,2. Callers usekatipo:ref(Handle)to extract the reference for manual receive patterns.Async responses now include
metricsin the response map, matching the data sync requests get. The worker monitors the caller process for streaming body requests and auto-finishes the upload if the caller dies, preventing worker hangs.The C port gains
upload_read_cbusing curl'sCURLOPT_READFUNCTIONwith pause/unpause, body message parsing inerl_inputvia atom-type peeking, and a ConnInfo linked list for ref-based lookup. Late chunks afterbody_doneare silently dropped.Tested with 28 unit tests covering happy paths, edge cases (double finish, send after finish, empty body, large body, bidirectional streaming), failure modes (worker death, caller death, timeouts), and a PropEr statem suite that generates random sequences of uploads, gets, chunks, finishes, timeouts, and recovery across the shared pool, verifying body content ordering and metrics on every response.