Conversation
Adds a batch-aware method to MessageDeliverer that transports can override for concurrent I/O. Backward compatible via default implementation that loops over single-entry deliver(). Changes: - MessageDeliverer.deliverBatch(entries) — default sequential implementation preserves input order and per-entry result classification. - CompositeMessageDeliverer.deliverBatch — groups entries by deliveryType, delegates each sub-batch to the matching deliverer's deliverBatch, fails permanently for unknown types (consistent with single-entry deliver). - OutboxEntryProcessor.processBatch(entries) — calls deliverer.deliverBatch once, applies retry policy per result, returns processed entries in input order. Existing process(entry) preserved for callers. - OutboxProcessor.processNext switches from per-entry forEach to a single processBatch call, AND now returns Int (count of processed entries). This subsumes KOJAK-71 — the count is exposed naturally as part of this refactor. Per-entry duration semantics in OutboxProcessingEvent change in batch path: each entry receives the wall-clock duration of the whole batch (because transports may overlap per-entry I/O internally — Kafka's record batching, parallel HTTP sendAsync — making per-entry timing meaningless). Documented in KDoc. MicrometerOutboxListener (KOJAK-44) is unaffected — it uses onBatchProcessed for batch timing and only counts per-outcome events. No throughput change expected (default impl is identical sequential loop). Smoke benchmark on Kafka batchSize=50: 8.321 ms/op vs baseline 8.665 ms/op — within measurement noise. Real throughput gain comes when transports override deliverBatch (KOJAK-73 Kafka, KOJAK-74 HTTP). Closes KOJAK-71 (processNext returns Int — folded in here as the same refactor naturally exposes the count).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a batch-aware method to
MessageDelivererthat transports can override for concurrent I/O. Pure interface introduction with sequential default implementation — zero behavior change today, but unlocks the next two PRs in the optimization chain.KOJAK-72 — deliverBatch interface
Closes KOJAK-71 —
processNext returns Intwas folded in here since the same refactor naturally exposes the count.Changes
MessageDeliverer.deliverBatch(entries)Default sequential implementation — loops over single-entry
deliver(), preserves input order and per-entry result classification. Backward compatible: existing implementations don't need to change.CompositeMessageDeliverer.deliverBatchGroups entries by
deliveryType, delegates each sub-batch to the matching deliverer'sdeliverBatch. Fails permanently for unknown types (consistent with single-entrydeliver). Results re-assembled in original input order.OutboxEntryProcessor.processBatch(entries)New method — calls
deliverer.deliverBatchonce, applies retry policy per result. Existingprocess(entry)preserved for single-entry callers (deduplicated via sharedapplyResulthelper).OutboxProcessor.processNextforEach { entryProcessor.process(...) }to a singleprocessBatch(claimed)callInt(count of processed entries) — subsumes KOJAK-71Semantic note: per-entry duration in batch path
OutboxProcessingEvent.durationsemantics change in the batch path: each entry receives the wall-clock duration of the whole batch, not its individual delivery. This is intentional — transports overridingdeliverBatch(Kafka fire-flush-await, parallel HTTPsendAsync) overlap per-entry I/O internally, making per-entry timing meaningless.MicrometerOutboxListener(KOJAK-44) is unaffected — it usesonBatchProcessedfor batch timing and only counts per-outcome events.Documented in
OutboxProcessorKDoc.Verification
./gradlew test -x :okapi-integration-tests:test)./gradlew :okapi-integration-tests:test— Postgres/MySQL/Kafka/HTTP via Testcontainers)What's next
This PR introduces the interface. The actual throughput improvements come when transports override
deliverBatch:KafkaMessageDeliverer.deliverBatch— fire-flush-await pattern, expected 10-100× Kafka throughput improvementHttpMessageDeliverer.deliverBatch— parallelhttpClient.sendAsync, expected 10-50× HTTP throughput at realistic webhook latencyTest plan
MessageDelivererTest(default deliverBatch behavior),CompositeMessageDelivererTest(grouping + permanent failure for unknown types + override usage)OutboxEntryProcessorTestwithprocessBatchcases (mixed results, empty input)OutboxProcessorTestwith return-value assertions