diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index 0c67411f..b92ec6a9 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message { */ bool hasOrderingKey() const; + /** + * Check if the message has a null value. + * + * Messages with null values are used as tombstones on compacted topics + * to delete the message for a specific key. + * + * @return true if the message has a null value (tombstone) + * false if the message has actual payload data + */ + bool hasNullValue() const; + /** * Get the UTC based timestamp in milliseconds referring to when the message was published by the client * producer diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h index c2f089f7..89167185 100644 --- a/include/pulsar/MessageBuilder.h +++ b/include/pulsar/MessageBuilder.h @@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder { */ MessageBuilder& disableReplication(bool flag); + /** + * Mark the message as having a null value. + * + * This is used for messages on compacted topics where a null value + * acts as a tombstone for a specific key, removing the message from + * the compacted view. + * + * @return the message builder instance + */ + MessageBuilder& setNullValue(); + /** * create a empty message, with no properties or data * diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h index 1f1f91ff..8aceca52 100644 --- a/include/pulsar/c/message.h +++ b/include/pulsar/c/message.h @@ -127,6 +127,15 @@ PULSAR_PUBLIC void pulsar_message_set_replication_clusters(pulsar_message_t *mes */ PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t *message, int flag); +/** + * Mark the message as having a null value. + * + * This is used for messages on compacted topics where a null value + * acts as a tombstone for a specific key, removing the message from + * the compacted view. + */ +PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message); + /// Accessor for built messages /** @@ -221,6 +230,16 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message, */ PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message); +/** + * Check if the message has a null value. + * + * Messages with null values are used as tombstones on compacted topics + * to delete the message for a specific key. + * + * @return 1 if the message has a null value, 0 otherwise + */ +PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message); + #ifdef __cplusplus } #endif diff --git a/lib/Commands.cc b/lib/Commands.cc index 08dc7183..3dd22591 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -871,6 +871,10 @@ static std::pair, size_t> serializeSingleMessageMetadata metadata.set_sequence_id(msgMetadata.sequence_id()); } + if (msgMetadata.null_value()) { + metadata.set_null_value(true); + } + size_t size = metadata.ByteSizeLong(); std::unique_ptr data{new char[size]}; metadata.SerializeToArray(data.get(), size); diff --git a/lib/Message.cc b/lib/Message.cc index df6cff9a..f4e6d695 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerE } else { impl_->metadata.clear_sequence_id(); } + + if (singleMetadata.null_value()) { + impl_->metadata.set_null_value(true); + } else { + impl_->metadata.clear_null_value(); + } } const MessageId& Message::getMessageId() const { @@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const { return impl_->getOrderingKey(); } +bool Message::hasNullValue() const { + if (impl_) { + return impl_->metadata.null_value(); + } + return false; +} + const std::string& Message::getTopicName() const { if (!impl_) { return emptyString; diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc index a9e61d47..f52a6848 100644 --- a/lib/MessageBuilder.cc +++ b/lib/MessageBuilder.cc @@ -157,6 +157,12 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) { return *this; } +MessageBuilder& MessageBuilder::setNullValue() { + checkMetadata(); + impl_->metadata.set_null_value(true); + return *this; +} + const char* MessageBuilder::data() const { assert(impl_->payload.data()); return impl_->payload.data(); diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc index cca04602..51afa8e3 100644 --- a/lib/c/c_Message.cc +++ b/lib/c/c_Message.cc @@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t *message, int flag) { message->builder.disableReplication(flag); } +void pulsar_message_set_null_value(pulsar_message_t *message) { message->builder.setNullValue(); } + int pulsar_message_has_property(pulsar_message_t *message, const char *name) { return message->message.hasProperty(name); } @@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t *message, const char *sc const char *pulsar_message_get_producer_name(pulsar_message_t *message) { return message->message.getProducerName().c_str(); } + +int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); } diff --git a/tests/BatchMessageTest.cc b/tests/BatchMessageTest.cc index 0b61de1d..0b786c3a 100644 --- a/tests/BatchMessageTest.cc +++ b/tests/BatchMessageTest.cc @@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) { } } +TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) { + std::vector msgs; + msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build()); + msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build()); + msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build()); + + SharedBuffer payload; + Commands::serializeSingleMessagesToBatchPayload(payload, msgs); + ASSERT_EQ(payload.writableBytes(), 0); + + MessageBatch messageBatch; + auto fakeId = MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build(); + messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast(msgs.size())); + const std::vector& messages = messageBatch.messages(); + + ASSERT_EQ(messages.size(), 3); + + ASSERT_TRUE(messages[0].hasNullValue()); + ASSERT_EQ(messages[0].getPartitionKey(), "key1"); + ASSERT_EQ(messages[0].getLength(), 0); + + ASSERT_FALSE(messages[1].hasNullValue()); + ASSERT_EQ(messages[1].getPartitionKey(), "key2"); + ASSERT_EQ(messages[1].getDataAsString(), "content2"); + + ASSERT_TRUE(messages[2].hasNullValue()); + ASSERT_EQ(messages[2].getPartitionKey(), "key3"); + ASSERT_EQ(messages[2].getLength(), 0); +} + TEST(BatchMessageTest, testSendCallback) { const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback"; diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc index 688cb330..0ffcc417 100644 --- a/tests/MessageTest.cc +++ b/tests/MessageTest.cc @@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) { auto msg = MessageBuilder().setContent("test").build(); ASSERT_TRUE(msg.getTopicName().empty()); } + +TEST(MessageTest, testNullValueMessage) { + { + auto msg = MessageBuilder().setContent("test").build(); + ASSERT_FALSE(msg.hasNullValue()); + } + + { + auto msg = MessageBuilder().setNullValue().setPartitionKey("key1").build(); + ASSERT_TRUE(msg.hasNullValue()); + ASSERT_EQ(msg.getLength(), 0); + ASSERT_EQ(msg.getPartitionKey(), "key1"); + } + + { + auto msg = MessageBuilder().setPartitionKey("key2").setNullValue().build(); + ASSERT_TRUE(msg.hasNullValue()); + ASSERT_EQ(msg.getPartitionKey(), "key2"); + } +} + +TEST(MessageTest, testEmptyMessage) { + auto msg = MessageBuilder().build(); + ASSERT_FALSE(msg.hasNullValue()); + ASSERT_EQ(msg.getLength(), 0); +} + +TEST(MessageTest, testEmptyStringNotNullValue) { + // Empty string message - has content set to "" + auto emptyStringMsg = MessageBuilder().setContent("").build(); + ASSERT_FALSE(emptyStringMsg.hasNullValue()); + ASSERT_EQ(emptyStringMsg.getLength(), 0); + ASSERT_EQ(emptyStringMsg.getDataAsString(), ""); + + // Null value message - explicitly marked as null + auto nullValueMsg = MessageBuilder().setNullValue().setPartitionKey("key").build(); + ASSERT_TRUE(nullValueMsg.hasNullValue()); + ASSERT_EQ(nullValueMsg.getLength(), 0); + + // Both have length 0, but they are semantically different + // Empty string: the value IS an empty string + // Null value: the value does not exist (tombstone for compaction) +} diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index af833ce6..f7a44c74 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -1045,5 +1045,130 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) { client.close(); } +TEST(ReaderTest, testReadCompactedWithNullValue) { + Client client(serviceUrl); + + const std::string topicName = + "persistent://public/default/testReadCompactedWithNullValue-" + std::to_string(time(nullptr)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + // Send messages with keys + ASSERT_EQ(ResultOk, + producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build())); + ASSERT_EQ(ResultOk, + producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build())); + ASSERT_EQ(ResultOk, + producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build())); + + // Send a tombstone (null value) for key2 + auto tombstone = MessageBuilder().setPartitionKey("key2").setNullValue().build(); + ASSERT_TRUE(tombstone.hasNullValue()); + ASSERT_EQ(tombstone.getLength(), 0); + ASSERT_EQ(ResultOk, producer.send(tombstone)); + + // Update key1 with a new value + ASSERT_EQ(ResultOk, + producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build())); + + // Trigger compaction via admin API + { + std::string compactUrl = + adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" + + std::to_string(time(nullptr)) + "/compaction"; + // Note: Compaction is async, we just trigger it + makePutRequest(compactUrl, ""); + } + + // Create a reader with readCompacted enabled + ReaderConfiguration readerConf; + readerConf.setReadCompacted(true); + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + + // Read all messages and verify we can detect null values + std::map keyValues; + std::set nullValueKeys; + + for (int i = 0; i < 10; i++) { + bool hasMessageAvailable = false; + ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); + if (!hasMessageAvailable) { + break; + } + + Message msg; + Result res = reader.readNext(msg, 3000); + if (res != ResultOk) { + break; + } + + std::string key = msg.getPartitionKey(); + if (msg.hasNullValue()) { + nullValueKeys.insert(key); + LOG_INFO("Received null value (tombstone) for key: " << key); + } else { + keyValues[key] = msg.getDataAsString(); + LOG_INFO("Received message for key: " << key << ", value: " << msg.getDataAsString()); + } + } + + // Verify we received the tombstone for key2 + // Note: Without compaction completing, we see all messages including the tombstone + // After compaction, we would only see the latest value for each key + ASSERT_TRUE(nullValueKeys.count("key2") > 0 || keyValues.count("key2") == 0) + << "key2 should either have a null value or be absent after compaction"; + + producer.close(); + reader.close(); + client.close(); +} + +TEST(ReaderTest, testNullValueMessageProperties) { + Client client(serviceUrl); + + const std::string topicName = + "persistent://public/default/testNullValueMessageProperties-" + std::to_string(time(nullptr)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + // Send a null value message with properties + auto tombstone = MessageBuilder() + .setPartitionKey("user-123") + .setNullValue() + .setProperty("reason", "account-deleted") + .setProperty("deleted-by", "admin") + .build(); + + ASSERT_TRUE(tombstone.hasNullValue()); + ASSERT_EQ(tombstone.getPartitionKey(), "user-123"); + ASSERT_EQ(tombstone.getProperty("reason"), "account-deleted"); + ASSERT_EQ(tombstone.getProperty("deleted-by"), "admin"); + ASSERT_EQ(tombstone.getLength(), 0); + + ASSERT_EQ(ResultOk, producer.send(tombstone)); + + // Create a reader and verify the message + ReaderConfiguration readerConf; + Reader reader; + ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); + + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 5000)); + + // Verify all properties are preserved + ASSERT_TRUE(msg.hasNullValue()); + ASSERT_EQ(msg.getPartitionKey(), "user-123"); + ASSERT_EQ(msg.getProperty("reason"), "account-deleted"); + ASSERT_EQ(msg.getProperty("deleted-by"), "admin"); + ASSERT_EQ(msg.getLength(), 0); + + producer.close(); + reader.close(); + client.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false)); INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false)); diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc index b51b2c86..95df342b 100644 --- a/tests/TableViewTest.cc +++ b/tests/TableViewTest.cc @@ -157,6 +157,95 @@ TEST(TableViewTest, testPublishEmptyValue) { client.close(); } +TEST(TableViewTest, testNullValueTombstone) { + const std::string topic = "testNullValueTombstone" + std::to_string(time(nullptr)); + Client client(lookupUrl); + + ProducerConfiguration producerConfiguration; + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + // Send initial messages with keys + auto count = 10; + for (int i = 0; i < count; ++i) { + auto msg = MessageBuilder() + .setPartitionKey("key" + std::to_string(i)) + .setContent("value" + std::to_string(i)) + .build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + // Create table view and verify all keys are present + TableView tableView; + ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView)); + ASSERT_EQ(tableView.size(), count); + + std::string value; + ASSERT_TRUE(tableView.containsKey("key5")); + ASSERT_TRUE(tableView.getValue("key5", value)); + ASSERT_EQ(value, "value5"); + + // Send a null value (tombstone) for key5 using setNullValue() + auto tombstone = MessageBuilder().setPartitionKey("key5").setNullValue().build(); + ASSERT_TRUE(tombstone.hasNullValue()); + ASSERT_EQ(ResultOk, producer.send(tombstone)); + + // Wait for table view to process the tombstone and remove the key + waitUntil( + std::chrono::seconds(2), [&] { return !tableView.containsKey("key5"); }, 100); + + // Verify key5 was removed by the tombstone + ASSERT_FALSE(tableView.containsKey("key5")); + ASSERT_EQ(tableView.size(), count - 1); + + // Verify other keys are still present + ASSERT_TRUE(tableView.containsKey("key0")); + ASSERT_TRUE(tableView.containsKey("key9")); + + client.close(); +} + +TEST(TableViewTest, testNullValueVsEmptyString) { + const std::string topic = "testNullValueVsEmptyString" + std::to_string(time(nullptr)); + Client client(lookupUrl); + + ProducerConfiguration producerConfiguration; + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + // Send messages for two keys + ASSERT_EQ(ResultOk, producer.send( + MessageBuilder().setPartitionKey("keyA").setContent("valueA").build())); + ASSERT_EQ(ResultOk, producer.send( + MessageBuilder().setPartitionKey("keyB").setContent("valueB").build())); + + TableView tableView; + ASSERT_EQ(ResultOk, client.createTableView(topic, {}, tableView)); + ASSERT_EQ(tableView.size(), 2); + + // Send empty string for keyA - this should also remove it from TableView + // (TableView treats empty payload as deletion) + auto emptyMsg = MessageBuilder().setPartitionKey("keyA").setContent("").build(); + ASSERT_FALSE(emptyMsg.hasNullValue()); + ASSERT_EQ(ResultOk, producer.send(emptyMsg)); + + // Send null value (tombstone) for keyB using setNullValue() + auto nullMsg = MessageBuilder().setPartitionKey("keyB").setNullValue().build(); + ASSERT_TRUE(nullMsg.hasNullValue()); + ASSERT_EQ(ResultOk, producer.send(nullMsg)); + + // Wait for both to be processed + waitUntil( + std::chrono::seconds(2), [&] { return tableView.size() == 0; }, 100); + + // Both keys should be removed + ASSERT_FALSE(tableView.containsKey("keyA")); + ASSERT_FALSE(tableView.containsKey("keyB")); + ASSERT_EQ(tableView.size(), 0); + + client.close(); +} + TEST(TableViewTest, testNotSupportNonPersistentTopic) { const std::string topic = TopicDomain::NonPersistent + "://public/default/testNotSupportNonPersistentTopic" +