A Kafka-inspired message broker written in Go. This repository implements a durable, file-backed messaging system with a TCP protocol, segment-based log storage, consumer group offset tracking, and startup recovery.
This project is intentionally smaller than Apache Kafka, but it follows several of the same core ideas:
- messages are appended to an immutable log
- data is stored on disk rather than only in memory
- offsets identify a consumer's position in the stream
- consumer groups advance independently
- the broker can recover its state after a restart
- logs rotate into segments so the system can grow without a single infinite file
If you are familiar with Kafka, the easiest way to understand this project is to think of it as a simplified single-broker version of Kafka's storage and consumption model. It does not implement topics, partitions, replication, leader election, rebalancing, or compression yet, but it does implement the foundations that make a log-based broker useful.
Apache Kafka is a distributed event streaming platform. At a high level, Kafka is used to move data from producers to consumers reliably and at scale.
Kafka's most important ideas are:
- Producers write records into Kafka.
- Consumers read records from Kafka.
- Topics are named streams of records.
- Partitions split a topic into ordered shards for parallelism and throughput.
- Offsets identify a consumer's position inside a partition.
- Consumer groups coordinate multiple consumers so work can be shared.
- Replication keeps multiple copies of data for fault tolerance.
- Segments break a long log into manageable files.
- Retention removes older data according to time or size rules.
Kafka is not just a queue. It is a distributed log with replayable history. That design gives it durability, high throughput, and the ability for multiple consumers to read the same data independently.
This repository is a compact implementation of the same log-centric model.
It currently provides:
- a TCP broker listening on
:8080 - a producer client that appends messages
- a consumer client that reads messages using broker-managed offsets
- on-disk storage with
.logand.idxfiles - segment rotation when a log grows too large
- startup discovery and recovery of all segments
- consumer group offset persistence
- a versioned binary protocol with request and response headers
- concurrent access using
sync.RWMutex
It intentionally does not yet provide:
- topics
- partitions
- replication
- leader/follower behavior
- rebalancing
- batching
- compression
- retention policies
- durable cluster membership
The implementation is therefore best understood as a single-broker, file-backed message log with Kafka-like semantics.
message-broker-system/
├── cmd/
│ ├── consumer/
│ │ └── main.go
│ ├── producer/
│ │ └── main.go
│ └── server/
│ └── main.go
├── internal/
│ ├── protocol/
│ │ └── protocol.go
│ ├── server/
│ │ └── handler.go
│ └── store/
│ └── store.go
├── data/
│ └── log/
├── go.mod
├── .gitignore
└── README.md
The system is organized into three primary layers.
The cmd/ directory contains the executable entry points:
cmd/server/main.gostarts the brokercmd/producer/main.gosends messages to the brokercmd/consumer/main.gofetches and commits messages as a consumer group
The internal/protocol/ and internal/server/ packages handle network framing and command dispatch.
internal/protocol/protocol.godefines the binary request and response headersinternal/server/handler.goreads requests, decodes commands, and invokes the store
The internal/store/ package owns persistence, recovery, rotation, and offset lookup.
internal/store/store.gomanages all file I/O and segment metadata
This file starts the broker process.
What it does:
- Creates the storage engine by calling
store.New("./data/log", "messages"). - Opens a TCP listener on port
8080. - Accepts incoming client connections in a loop.
- Spawns a goroutine for each connection and hands it to
server.HandleConn.
Why it matters:
- This is the broker entry point.
- It wires together storage and network handling.
- The same store instance is shared across all connections.
The server is intentionally simple: it focuses on being a long-running broker process rather than a web service or HTTP API.
This file is the producer client.
What it does:
- Dials the broker at
localhost:8080. - Creates a list of example messages.
- For each message, builds a request header and payload.
- Sends a
CommandProducerequest. - Reads the broker response header.
- Reads the returned message offset.
- Prints the acknowledged offset.
Important behavior:
- Each request gets a correlation ID.
- The client sends a structured protocol message rather than a raw byte stream.
- The broker returns the assigned offset after the append succeeds.
In Kafka terms, this is the producer side of the system. It is responsible for writing records into the broker's log.
This file is the consumer client.
What it does:
- Dials the broker at
localhost:8080. - Uses a fixed consumer group ID.
- Builds a
CommandFetchrequest for the broker. - Receives the next available message for that group.
- Prints the record and its offset.
- Commits the offset back to the broker using
CommandCommit.
Important behavior:
- The consumer no longer manually increments offsets on its own.
- It asks the broker for the next offset using the group ID.
- This means the broker owns the persisted consumer progress.
- A restart of the consumer does not lose its last committed position.
In Kafka terms, this is a consumer group client reading from a log and committing progress.
This file defines the binary wire protocol.
It contains:
- the protocol version constant
- command constants
- request and response header types
- header serialization and deserialization helpers
Why it exists:
A broker protocol should not be an ad hoc byte stream. A structured protocol makes the system easier to extend, debug, and evolve.
Current protocol features:
- protocol versioning
- correlation IDs
- client IDs
- command codes
- explicit payload lengths
- response status codes
The protocol is intentionally small, but it establishes the foundation for later additions such as batching, metadata requests, and partition-aware routing.
This file handles all network requests after a connection is accepted.
Main responsibilities:
- read request headers
- read request payloads
- dispatch commands to the store
- write structured responses back to the client
Supported commands:
CommandProduceCommandConsumeCommandCommitCommandFetch
Each handler is responsible for validating its payload, invoking the store, and returning a well-formed response.
This is the request/response control plane of the broker. It is where network traffic becomes application behavior.
This is the core of the system.
It contains:
- the
Storetype - the
Segmenttype - segment discovery logic
- rotation logic
- append logic
- read logic
- consumer offset storage
- recovery logic
This file is the heart of the broker because it defines how data is stored, found, recovered, and rotated.
The Store type keeps all broker state in one place.
Relevant fields:
dirPath: base directory for log filesbaseName: file prefix, currentlymessagessegments: all known segments on diskactive: the segment currently receiving appendsnextID: the next message offset to assignmaxBytes: maximum size of one segment before rotationcurrentSize: current size of the active segmentmu:sync.RWMutexfor concurrency control
A segment represents one pair of log and index files.
Fields:
BaseOffset: the first message ID contained in the segmentLogFile: the.logfile handleIdxFile: the.idxfile handle
This is the key abstraction that fixes the segment-rotation bug from the earlier version. Instead of assuming all reads come from the active file, the store now knows how to locate the correct segment for any offset.
The log file stores the actual record payloads.
Each record is written as:
[length:4 bytes][payload bytes]
Example:
00 00 00 05 68 65 6c 6c 6f
That means:
- length = 5
- payload =
hello
This format is simple, compact, and allows variable-length messages.
The index file maps message offsets to positions inside the corresponding log.
Each index entry is:
[messageID:8 bytes][logPosition:8 bytes]
Example:
[ID=12][pos=0]
[ID=13][pos=9]
[ID=14][pos=21]
This index allows the broker to find a record without scanning the entire log file.
Consumer group offsets are stored in files named after the group.
Format:
[offset:8 bytes]
Example:
worker-group-1.offsetanalytics.offset
This lets consumer groups resume from the correct position after a restart.
When a producer sends a message:
- The client creates a request header with version, correlation ID, client ID, command, and payload length.
- The broker reads the header and payload.
- The store checks whether the active segment is full.
- If necessary, the store rotates to a new segment.
- The payload is appended to the active log.
- The log position is stored in the active index.
- Both files are synced to disk.
- The broker returns the assigned message ID.
This flow makes message writes durable and recoverable.
When a consumer requests a message by offset:
- The broker receives a consume request.
- The store locates the correct segment using the offset.
- The segment-local index offset is computed.
- The log position is read from the index.
- The payload is read from the log.
- The broker returns the payload.
This is the direct read path by ID.
When a consumer group fetches the next message:
- The broker reads the consumer group ID.
- The store loads the last committed offset for that group.
- The next unread offset is computed.
- The broker attempts to read that message.
- If a record exists, it is returned with its offset.
- If no record exists yet, the broker returns the next offset to wait for.
This is the broker-managed offset flow and is the closer equivalent to Kafka consumer behavior.
When a consumer commits progress:
- The broker reads the group ID and offset.
- The store writes the offset to
{groupID}.offset. - The offset is durable on disk.
- Future fetches resume from the next position.
This is how consumer progress survives restarts.
The system rotates segments when the active log file grows beyond maxBytes.
Current threshold:
maxBytes = 1MB
Rotation process:
- Close the current active segment files.
- Rename the active
.logand.idxfiles to include the starting offset. - Create a fresh
messages.logandmessages.idxpair. - Register the new segment as the active one.
Why this matters:
- log files stay bounded in size
- old data remains readable
- startup recovery becomes manageable
- the broker can locate records by segment
This is one of the most important differences between an append-only toy implementation and a real log-backed broker.
On startup, the store does not assume a clean shutdown.
It performs three important tasks:
-
Discover segments
- Scan
data/log - Find all matching log and index files
- Parse their base offsets
- Open them in sorted order
- Scan
-
Recover the next offset
- Inspect the index files
- Find the highest committed message ID
- Set
nextIDto the next available value
-
Validate the WAL
- Check index/log consistency in the active segment
- Truncate incomplete trailing writes if necessary
This recovery path is what prevents the broker from losing logical continuity after a restart.
The store uses sync.RWMutex.
Why this matters:
- multiple readers can access the store concurrently
- writes still remain exclusive
- reads do not block each other unnecessarily
- the broker is better suited for concurrent consumer traffic
Lock usage:
Append()uses an exclusive lockReadByID()uses a read lock- segment rotation happens under the write path
This is a better fit for a log system than a plain sync.Mutex because read traffic is typically much higher than write traffic.
The broker listens on TCP port 8080.
Request handling is connection-based rather than HTTP-based.
That means:
- the protocol is binary, not JSON
- the broker can keep a connection open for multiple requests
- every request includes a command and payload length
- responses include status and payload length
This is closer to how a low-level broker protocol works in practice.
This project mirrors Kafka in a simplified form.
- Broker →
cmd/server/main.goplusinternal/server/handler.go - Producer →
cmd/producer/main.go - Consumer →
cmd/consumer/main.go - Log segment →
Segmentininternal/store/store.go - Offset → assigned message ID
- Consumer group offset →
{groupID}.offset - Append-only log →
.logfile - Index →
.idxfile - Recovery →
discoverSegments()andrecoverState()
- multiple topics
- partitions per topic
- replication
- leader election
- follower synchronization
- consumer group rebalance
- heartbeat/session timeouts
- offset commit APIs compatible with Kafka clients
- retention policies
- compression
- transactional semantics
- zero-copy network optimizations
Those are all reasonable future additions, but the current codebase is focused on getting the storage and offset model correct first.
- Go 1.26.2 or compatible Go toolchain
- TCP port
8080available
From the project root:
go build ./cmd/server
go build ./cmd/producer
go build ./cmd/consumer./serverThe broker will start on :8080 and create its storage under ./data/log.
In a second terminal:
./producerThis sends a small set of sample messages to the broker.
In a third terminal:
./consumerThis fetches messages for the configured group and commits offsets back to the broker.
Runtime data lives under data/log.
Expected files include:
messages.logmessages.idx- rotated files such as
messages-0000000005.log - consumer offset files such as
worker-group-1.offset
These files are generated at runtime and should not be committed to source control.
- The broker intentionally uses a minimal file format to keep the storage model easy to reason about.
- The code favors explicit logic over abstraction-heavy design.
- The protocol is binary so the system can evolve without changing to text-based framing later.
- The consumer client demonstrates broker-managed fetch and commit behavior rather than manual offset tracking.
These are not bugs in the current design, but they are important constraints:
- only one broker process exists
- there is no replication
- there are no partitions
- there is no retention policy
- batching is not implemented
- compression is not implemented
- protocol framing is simple and does not yet include all Kafka-style metadata