Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
*/
bool hasOrderingKey() const;

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return true if the message has a null value (tombstone)
* false if the message has actual payload data
*/
bool hasNullValue() const;

/**
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
* producer
Expand Down
11 changes: 11 additions & 0 deletions include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& disableReplication(bool flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*
* @return the message builder instance
*/
MessageBuilder& setNullValue();

/**
* create a empty message, with no properties or data
*
Expand Down
19 changes: 19 additions & 0 deletions include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ PULSAR_PUBLIC void pulsar_message_set_replication_clusters(pulsar_message_t *mes
*/
PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t *message, int flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*/
PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);

/// Accessor for built messages

/**
Expand Down Expand Up @@ -221,6 +230,16 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return 1 if the message has a null value, 0 otherwise
*/
PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);

#ifdef __cplusplus
}
#endif
4 changes: 4 additions & 0 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,10 @@ static std::pair<std::unique_ptr<char[]>, size_t> serializeSingleMessageMetadata
metadata.set_sequence_id(msgMetadata.sequence_id());
}

if (msgMetadata.null_value()) {
metadata.set_null_value(true);
}

size_t size = metadata.ByteSizeLong();
std::unique_ptr<char[]> data{new char[size]};
metadata.SerializeToArray(data.get(), size);
Expand Down
13 changes: 13 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerE
} else {
impl_->metadata.clear_sequence_id();
}

if (singleMetadata.null_value()) {
impl_->metadata.set_null_value(true);
} else {
impl_->metadata.clear_null_value();
}
}

const MessageId& Message::getMessageId() const {
Expand Down Expand Up @@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const {
return impl_->getOrderingKey();
}

bool Message::hasNullValue() const {
if (impl_) {
return impl_->metadata.null_value();
}
return false;
}

const std::string& Message::getTopicName() const {
if (!impl_) {
return emptyString;
Expand Down
6 changes: 6 additions & 0 deletions lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) {
return *this;
}

MessageBuilder& MessageBuilder::setNullValue() {
checkMetadata();
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageBuilder::setNullValue() only flips the metadata flag; it doesn't clear any previously-set payload, and the various setContent(...) overloads don't clear null_value. This allows building messages with non-empty payload but null_value=true (or vice versa), which will be interpreted incorrectly by consumers/table views. Make the two states mutually exclusive by clearing the payload/key-value when setting null value, and clearing null_value whenever content is set.

Suggested change
checkMetadata();
checkMetadata();
// Ensure null-value messages do not carry any payload or key-related metadata.
// Clear the payload buffer.
impl_->payload = SharedBuffer();
// Clear key-related metadata so that the null value state is self-consistent.
impl_->metadata.clear_partition_key();
impl_->metadata.clear_ordering_key();

Copilot uses AI. Check for mistakes.
impl_->metadata.set_null_value(true);
return *this;
}

const char* MessageBuilder::data() const {
assert(impl_->payload.data());
return impl_->payload.data();
Expand Down
4 changes: 4 additions & 0 deletions lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
message->builder.disableReplication(flag);
}

void pulsar_message_set_null_value(pulsar_message_t *message) { message->builder.setNullValue(); }

int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
return message->message.hasProperty(name);
}
Expand Down Expand Up @@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t *message, const char *sc
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}

int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }
30 changes: 30 additions & 0 deletions tests/BatchMessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) {
}
}

TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) {
std::vector<Message> msgs;
msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build());
msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build());
msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build());

SharedBuffer payload;
Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
ASSERT_EQ(payload.writableBytes(), 0);

MessageBatch messageBatch;
auto fakeId = MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build();
messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast<uint32_t>(msgs.size()));
const std::vector<Message>& messages = messageBatch.messages();

ASSERT_EQ(messages.size(), 3);

ASSERT_TRUE(messages[0].hasNullValue());
ASSERT_EQ(messages[0].getPartitionKey(), "key1");
ASSERT_EQ(messages[0].getLength(), 0);

ASSERT_FALSE(messages[1].hasNullValue());
ASSERT_EQ(messages[1].getPartitionKey(), "key2");
ASSERT_EQ(messages[1].getDataAsString(), "content2");

ASSERT_TRUE(messages[2].hasNullValue());
ASSERT_EQ(messages[2].getPartitionKey(), "key3");
ASSERT_EQ(messages[2].getLength(), 0);
}

TEST(BatchMessageTest, testSendCallback) {
const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";

Expand Down
43 changes: 43 additions & 0 deletions tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
auto msg = MessageBuilder().setContent("test").build();
ASSERT_TRUE(msg.getTopicName().empty());
}

TEST(MessageTest, testNullValueMessage) {
{
auto msg = MessageBuilder().setContent("test").build();
ASSERT_FALSE(msg.hasNullValue());
}

{
auto msg = MessageBuilder().setNullValue().setPartitionKey("key1").build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
ASSERT_EQ(msg.getPartitionKey(), "key1");
}

{
auto msg = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "key2");
}
}

TEST(MessageTest, testEmptyMessage) {
auto msg = MessageBuilder().build();
ASSERT_FALSE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
}

TEST(MessageTest, testEmptyStringNotNullValue) {
// Empty string message - has content set to ""
auto emptyStringMsg = MessageBuilder().setContent("").build();
ASSERT_FALSE(emptyStringMsg.hasNullValue());
ASSERT_EQ(emptyStringMsg.getLength(), 0);
ASSERT_EQ(emptyStringMsg.getDataAsString(), "");

// Null value message - explicitly marked as null
auto nullValueMsg = MessageBuilder().setNullValue().setPartitionKey("key").build();
ASSERT_TRUE(nullValueMsg.hasNullValue());
ASSERT_EQ(nullValueMsg.getLength(), 0);

// Both have length 0, but they are semantically different
// Empty string: the value IS an empty string
// Null value: the value does not exist (tombstone for compaction)
}
125 changes: 125 additions & 0 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1045,5 +1045,130 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
client.close();
}

TEST(ReaderTest, testReadCompactedWithNullValue) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testReadCompactedWithNullValue-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send messages with keys
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));

// Send a tombstone (null value) for key2
auto tombstone = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getLength(), 0);
ASSERT_EQ(ResultOk, producer.send(tombstone));

// Update key1 with a new value
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));

// Trigger compaction via admin API
{
std::string compactUrl =
adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr)) + "/compaction";
// Note: Compaction is async, we just trigger it
makePutRequest(compactUrl, "");
Comment on lines +1077 to +1081
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compaction admin URL is built using a fresh time(nullptr) value, which can differ from the one used to construct topicName. This can trigger compaction on a different (nonexistent) topic than the one you produced to. Build compactUrl directly from topicName (or extract the topic suffix once) and assert the returned HTTP status from makePutRequest (e.g., 204/409) so the test fails if compaction isn't triggered.

Suggested change
std::string compactUrl =
adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr)) + "/compaction";
// Note: Compaction is async, we just trigger it
makePutRequest(compactUrl, "");
// Build compaction URL directly from topicName to avoid mismatches
std::string topicPath = topicName;
std::size_t schemePos = topicPath.find("://");
if (schemePos != std::string::npos) {
topicPath.erase(schemePos, 3); // remove "://", e.g., "persistent://public/..." -> "persistent/public/..."
}
std::string compactUrl = adminUrl + "admin/v2/" + topicPath + "/compaction";
// Note: Compaction is async, we just trigger it, but assert that the request is accepted
int res = makePutRequest(compactUrl, "");
ASSERT_FALSE(res != 204 && res != 409);

Copilot uses AI. Check for mistakes.
}

// Create a reader with readCompacted enabled
ReaderConfiguration readerConf;
readerConf.setReadCompacted(true);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

// Read all messages and verify we can detect null values
std::map<std::string, std::string> keyValues;
std::set<std::string> nullValueKeys;

for (int i = 0; i < 10; i++) {
bool hasMessageAvailable = false;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
if (!hasMessageAvailable) {
break;
}

Message msg;
Result res = reader.readNext(msg, 3000);
if (res != ResultOk) {
break;
}
Comment on lines +1102 to +1105
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This read loop can silently succeed without validating behavior: it breaks when hasMessageAvailable is false or readNext fails, and the final assertion can still pass even if no messages were read at all. Prefer asserting readNext succeeds while hasMessageAvailable==true (or reading a known number of messages), and then assert on concrete outcomes (e.g., key1 == "value1-updated" and key2 is tombstone/absent after compaction).

Suggested change
Result res = reader.readNext(msg, 3000);
if (res != ResultOk) {
break;
}
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));

Copilot uses AI. Check for mistakes.

std::string key = msg.getPartitionKey();
if (msg.hasNullValue()) {
nullValueKeys.insert(key);
LOG_INFO("Received null value (tombstone) for key: " << key);
} else {
keyValues[key] = msg.getDataAsString();
LOG_INFO("Received message for key: " << key << ", value: " << msg.getDataAsString());
}
}

// Verify we received the tombstone for key2
// Note: Without compaction completing, we see all messages including the tombstone
// After compaction, we would only see the latest value for each key
ASSERT_TRUE(nullValueKeys.count("key2") > 0 || keyValues.count("key2") == 0)
<< "key2 should either have a null value or be absent after compaction";

producer.close();
reader.close();
client.close();
}

TEST(ReaderTest, testNullValueMessageProperties) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testNullValueMessageProperties-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send a null value message with properties
auto tombstone = MessageBuilder()
.setPartitionKey("user-123")
.setNullValue()
.setProperty("reason", "account-deleted")
.setProperty("deleted-by", "admin")
.build();

ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getPartitionKey(), "user-123");
ASSERT_EQ(tombstone.getProperty("reason"), "account-deleted");
ASSERT_EQ(tombstone.getProperty("deleted-by"), "admin");
ASSERT_EQ(tombstone.getLength(), 0);

ASSERT_EQ(ResultOk, producer.send(tombstone));

// Create a reader and verify the message
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg, 5000));

// Verify all properties are preserved
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "user-123");
ASSERT_EQ(msg.getProperty("reason"), "account-deleted");
ASSERT_EQ(msg.getProperty("deleted-by"), "admin");
ASSERT_EQ(msg.getLength(), 0);

producer.close();
reader.close();
client.close();
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));
Loading
Loading