Skip to content

[feat][client] Support null value messages#563

Open
Bhargavkonidena wants to merge 2 commits intoapache:mainfrom
Bhargavkonidena:issue_489
Open

[feat][client] Support null value messages#563
Bhargavkonidena wants to merge 2 commits intoapache:mainfrom
Bhargavkonidena:issue_489

Conversation

@Bhargavkonidena
Copy link
Copy Markdown

@Bhargavkonidena Bhargavkonidena commented Mar 30, 2026

Fixes #489

Motivation

Currently the producer cannot send a null value message, which is used as a tombstone for a specific key on a compacted topic.
In addition, the consumer cannot differentiate empty values from null values, because all these values are presented as empty std::strings.

Modifications

  1. Added MessageBuilder::setNullValue() method that sets the null_value field in MessageMetadata
  2. Added Message::hasNullValue() method to check if a message has null value
  3. Updated batch message serialization (Commands.cc) to preserve null_value in SingleMessageMetadata
  4. Updated batch message deserialization (Message.cc) to copy null_value from SingleMessageMetadata
  5. Added C API functions: pulsar_message_set_null_value() and pulsar_message_has_null_value()
  6. Added unit tests and integration tests

Verifying this change

  • Make sure that the change passes the CI checks.

Unit Tests (no broker required):

  • MessageTest.testNullValueMessage - Basic null value API functionality

  • MessageTest.testEmptyMessage - Verifies empty message is NOT a null value

  • MessageTest.testEmptyStringNotNullValue - Verifies empty string "" is NOT the same as null value

  • BatchMessageTest.testParseMessageBatchEntryWithNullValue - Verifies null value survives batch serialization/deserialization
    Integration Tests (require Pulsar broker):

  • TableViewTest.testNullValueTombstone - Send tombstone, verify key removed from TableView

  • TableViewTest.testNullValueVsEmptyString - Compare empty string vs null value behavior

  • ReaderTest.testReadCompactedWithNullValue - Reader detects tombstones with hasNullValue()

  • ReaderTest.testNullValueMessageProperties - Null value message preserves properties

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class “null value” (tombstone) support to the Pulsar C++ client so producers can emit tombstones on compacted topics and consumers can distinguish null vs empty payloads, including correct handling for batched messages and C bindings.

Changes:

  • Introduces MessageBuilder::setNullValue() and Message::hasNullValue() (plus C API equivalents) to represent/inspect tombstone messages.
  • Preserves null_value through batch message serialization/deserialization.
  • Adds unit + integration tests covering null value behavior across Message, batch parsing, TableView, and Reader scenarios.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/TableViewTest.cc Adds integration tests validating TableView tombstone behavior and empty-vs-null scenarios.
tests/ReaderTest.cc Adds integration tests for reading compacted topics with tombstones and tombstone property preservation.
tests/MessageTest.cc Adds unit tests for basic null value vs empty message semantics.
tests/BatchMessageTest.cc Adds unit test ensuring null_value survives batch serialize/deserialize.
lib/c/c_Message.cc Exposes C API wrappers to set/check null value.
lib/MessageBuilder.cc Implements MessageBuilder::setNullValue().
lib/Message.cc Implements Message::hasNullValue() and copies null_value from SingleMessageMetadata for batch entries.
lib/Commands.cc Serializes null_value into SingleMessageMetadata for batch payloads.
include/pulsar/c/message.h Declares/document C API for null value.
include/pulsar/MessageBuilder.h Declares/documents setNullValue().
include/pulsar/Message.h Declares/documents hasNullValue().

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

MessageBuilder& MessageBuilder::setNullValue() {
checkMetadata();
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageBuilder::setNullValue() only flips the metadata flag; it doesn't clear any previously-set payload, and the various setContent(...) overloads don't clear null_value. This allows building messages with non-empty payload but null_value=true (or vice versa), which will be interpreted incorrectly by consumers/table views. Make the two states mutually exclusive by clearing the payload/key-value when setting null value, and clearing null_value whenever content is set.

Suggested change
checkMetadata();
checkMetadata();
// Ensure null-value messages do not carry any payload or key-related metadata.
// Clear the payload buffer.
impl_->payload = SharedBuffer();
// Clear key-related metadata so that the null value state is self-consistent.
impl_->metadata.clear_partition_key();
impl_->metadata.clear_ordering_key();

Copilot uses AI. Check for mistakes.
Comment on lines +1077 to +1081
std::string compactUrl =
adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr)) + "/compaction";
// Note: Compaction is async, we just trigger it
makePutRequest(compactUrl, "");
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compaction admin URL is built using a fresh time(nullptr) value, which can differ from the one used to construct topicName. This can trigger compaction on a different (nonexistent) topic than the one you produced to. Build compactUrl directly from topicName (or extract the topic suffix once) and assert the returned HTTP status from makePutRequest (e.g., 204/409) so the test fails if compaction isn't triggered.

Suggested change
std::string compactUrl =
adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr)) + "/compaction";
// Note: Compaction is async, we just trigger it
makePutRequest(compactUrl, "");
// Build compaction URL directly from topicName to avoid mismatches
std::string topicPath = topicName;
std::size_t schemePos = topicPath.find("://");
if (schemePos != std::string::npos) {
topicPath.erase(schemePos, 3); // remove "://", e.g., "persistent://public/..." -> "persistent/public/..."
}
std::string compactUrl = adminUrl + "admin/v2/" + topicPath + "/compaction";
// Note: Compaction is async, we just trigger it, but assert that the request is accepted
int res = makePutRequest(compactUrl, "");
ASSERT_FALSE(res != 204 && res != 409);

Copilot uses AI. Check for mistakes.
Comment on lines +1102 to +1105
Result res = reader.readNext(msg, 3000);
if (res != ResultOk) {
break;
}
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This read loop can silently succeed without validating behavior: it breaks when hasMessageAvailable is false or readNext fails, and the final assertion can still pass even if no messages were read at all. Prefer asserting readNext succeeds while hasMessageAvailable==true (or reading a known number of messages), and then assert on concrete outcomes (e.g., key1 == "value1-updated" and key2 is tombstone/absent after compaction).

Suggested change
Result res = reader.readNext(msg, 3000);
if (res != ResultOk) {
break;
}
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support null value messages

2 participants