[feat][client] Support null value messages#563
[feat][client] Support null value messages#563Bhargavkonidena wants to merge 2 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds first-class “null value” (tombstone) support to the Pulsar C++ client so producers can emit tombstones on compacted topics and consumers can distinguish null vs empty payloads, including correct handling for batched messages and C bindings.
Changes:
- Introduces
MessageBuilder::setNullValue()andMessage::hasNullValue()(plus C API equivalents) to represent/inspect tombstone messages. - Preserves
null_valuethrough batch message serialization/deserialization. - Adds unit + integration tests covering null value behavior across Message, batch parsing, TableView, and Reader scenarios.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/TableViewTest.cc | Adds integration tests validating TableView tombstone behavior and empty-vs-null scenarios. |
| tests/ReaderTest.cc | Adds integration tests for reading compacted topics with tombstones and tombstone property preservation. |
| tests/MessageTest.cc | Adds unit tests for basic null value vs empty message semantics. |
| tests/BatchMessageTest.cc | Adds unit test ensuring null_value survives batch serialize/deserialize. |
| lib/c/c_Message.cc | Exposes C API wrappers to set/check null value. |
| lib/MessageBuilder.cc | Implements MessageBuilder::setNullValue(). |
| lib/Message.cc | Implements Message::hasNullValue() and copies null_value from SingleMessageMetadata for batch entries. |
| lib/Commands.cc | Serializes null_value into SingleMessageMetadata for batch payloads. |
| include/pulsar/c/message.h | Declares/document C API for null value. |
| include/pulsar/MessageBuilder.h | Declares/documents setNullValue(). |
| include/pulsar/Message.h | Declares/documents hasNullValue(). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| MessageBuilder& MessageBuilder::setNullValue() { | ||
| checkMetadata(); |
There was a problem hiding this comment.
MessageBuilder::setNullValue() only flips the metadata flag; it doesn't clear any previously-set payload, and the various setContent(...) overloads don't clear null_value. This allows building messages with non-empty payload but null_value=true (or vice versa), which will be interpreted incorrectly by consumers/table views. Make the two states mutually exclusive by clearing the payload/key-value when setting null value, and clearing null_value whenever content is set.
| checkMetadata(); | |
| checkMetadata(); | |
| // Ensure null-value messages do not carry any payload or key-related metadata. | |
| // Clear the payload buffer. | |
| impl_->payload = SharedBuffer(); | |
| // Clear key-related metadata so that the null value state is self-consistent. | |
| impl_->metadata.clear_partition_key(); | |
| impl_->metadata.clear_ordering_key(); |
| std::string compactUrl = | ||
| adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" + | ||
| std::to_string(time(nullptr)) + "/compaction"; | ||
| // Note: Compaction is async, we just trigger it | ||
| makePutRequest(compactUrl, ""); |
There was a problem hiding this comment.
The compaction admin URL is built using a fresh time(nullptr) value, which can differ from the one used to construct topicName. This can trigger compaction on a different (nonexistent) topic than the one you produced to. Build compactUrl directly from topicName (or extract the topic suffix once) and assert the returned HTTP status from makePutRequest (e.g., 204/409) so the test fails if compaction isn't triggered.
| std::string compactUrl = | |
| adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" + | |
| std::to_string(time(nullptr)) + "/compaction"; | |
| // Note: Compaction is async, we just trigger it | |
| makePutRequest(compactUrl, ""); | |
| // Build compaction URL directly from topicName to avoid mismatches | |
| std::string topicPath = topicName; | |
| std::size_t schemePos = topicPath.find("://"); | |
| if (schemePos != std::string::npos) { | |
| topicPath.erase(schemePos, 3); // remove "://", e.g., "persistent://public/..." -> "persistent/public/..." | |
| } | |
| std::string compactUrl = adminUrl + "admin/v2/" + topicPath + "/compaction"; | |
| // Note: Compaction is async, we just trigger it, but assert that the request is accepted | |
| int res = makePutRequest(compactUrl, ""); | |
| ASSERT_FALSE(res != 204 && res != 409); |
| Result res = reader.readNext(msg, 3000); | ||
| if (res != ResultOk) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
This read loop can silently succeed without validating behavior: it breaks when hasMessageAvailable is false or readNext fails, and the final assertion can still pass even if no messages were read at all. Prefer asserting readNext succeeds while hasMessageAvailable==true (or reading a known number of messages), and then assert on concrete outcomes (e.g., key1 == "value1-updated" and key2 is tombstone/absent after compaction).
| Result res = reader.readNext(msg, 3000); | |
| if (res != ResultOk) { | |
| break; | |
| } | |
| ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); |
Fixes #489
Motivation
Currently the producer cannot send a null value message, which is used as a tombstone for a specific key on a compacted topic.
In addition, the consumer cannot differentiate empty values from null values, because all these values are presented as empty std::strings.
Modifications
Verifying this change
Unit Tests (no broker required):
MessageTest.testNullValueMessage - Basic null value API functionality
MessageTest.testEmptyMessage - Verifies empty message is NOT a null value
MessageTest.testEmptyStringNotNullValue - Verifies empty string "" is NOT the same as null value
BatchMessageTest.testParseMessageBatchEntryWithNullValue - Verifies null value survives batch serialization/deserialization
Integration Tests (require Pulsar broker):
TableViewTest.testNullValueTombstone - Send tombstone, verify key removed from TableView
TableViewTest.testNullValueVsEmptyString - Compare empty string vs null value behavior
ReaderTest.testReadCompactedWithNullValue - Reader detects tombstones with hasNullValue()
ReaderTest.testNullValueMessageProperties - Null value message preserves properties
Documentation
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)