Imagine a live stock market where a million trading bots need to know the exact millisecond a price drops. If they rely on a standard REST API (GET /price), they are forced to constantly “poll” the server, asking “Did it change yet?” thousands of times a second.
To survive this chaos, systems abandon HTTP entirely and use a Message Broker. By establishing a single, persistent TCP connection, millions of clients can sit in silence until the exact moment the server pushes the new data to everyone simultaneously eliminating wasted requests and delivering pure, zero-latency synchronization.
Understanding the components
Modern web is built like a vending machine - you put a coin in (HTTP Request), you get a snack out (HTTP Response), and the machine forgets you exist. It is completely stateless.
To build a real-time broker, we have to drop down to the transport layer and use raw TCP Sockets. Think of a TCP socket not as a vending machine, but as a telephone line. Once the handshake is complete, the pipe stays open indefinitely. Our Go server will keep these physical network pipes open in memory, allowing us to shove bytes down the wire to our clients at any moment.
fig 001. A Publisher pushing data to the Broker, which uses its internal Hash Table to broadcast the payload to all 3 Subscribers instantly.
Deciding the data structure
At its core, a message broker is just a high-speed switchboard. It doesn’t care about the contents of your data; it only cares about the Topic (the channel the data belongs to).
To make this work, the server needs a “brain” to remember which user is listening to which topic. In Go, the fastest way to achieve this O(1) lookup time is an in-memory Hash Table (a map). The keys will be the topic strings (like "AAPL_stock" or "live_chat"), and the values will be arrays of live memory pointers pointing directly to the open network sockets. This memory mapping is detailed in Fig. 002.
fig 002. Conceptual layout of the Broker's in-memory Hash Table, mapping topic strings to slices of network connection pointers.
type Broker struct {
// Maps a topic string to an array of live network pipes
Subscribers map[string][]net.Conn
// Protects the map from simultaneous read/write crashes (explained later)
Lock sync.RWMutex
}
Subscribing
When a client connects and sends a {"command": "SUB", "topic": "test"} payload, we need to save their live socket into our Hash Table.
In Go, we attach a method to our Broker struct. We find the specific topic in the map, and use the built-in append() function to add their net.Conn pointer to the end of the array.
func (b *Broker) Subscribe(topic string, conn net.Conn) {
// Lock the map so no one else can touch it
b.Lock.Lock()
// Add the TCP socket to the specific topic's array
b.Subscribers[topic] = append(b.Subscribers[topic], conn)
// Unlock the map!
b.Lock.Unlock()
fmt.Printf("New client subscribed to topic: %s\n", topic)
}
Publishing
When an event actually happens—like the stock price dropping—a publisher sends a {"command": "PUB", "topic": "test", "payload": "100"} message.
The broker looks up "test" in the Hash Table, instantly retrieves the array of waiting network sockets, and loops through them, firing the bytes down the wire as fast as the CPU can process them.
func (b *Broker) Publish(topic string, payload string) {
// Lock the map for READING only
b.Lock.RLock()
defer b.Lock.RUnlock()
// Get the array of active TCP sockets for this specific topic
subscribers := b.Subscribers[topic]
// Loop through every socket and shoot the bytes down the pipe!
for _, conn := range subscribers {
outgoingMsg := fmt.Sprintf("BROKER BROADCAST [%s]: %s\n", topic, payload)
conn.Write([]byte(outgoingMsg))
}
}
Why we need locks (The Concurrency Problem)
If you run this server at scale, you will immediately hit a race condition.
Go handles each incoming TCP connection in its own isolated thread (a Goroutine). If 5 Publishers try to broadcast messages, and 2 Subscribers try to join the server at the exact same microsecond, 7 different threads will attempt to access the exact same RAM address simultaneously. The operating system will panic and crash the server.
To solve this, we use a sync.RWMutex. This locking mechanism prevents multiple threads from modifying the same data at once, as illustrated in Fig. 003.
fig 003. Two Goroutines attempting a simultaneous write (SUB 'test') to the same memory location (Map Memory), resulting in a data race that a Mutex must resolve.
Notice that Publish uses RLock() (Read Lock). This brilliant piece of code allows 100 different Goroutines to read the Hash Table simultaneously for maximum speed, but completely blocks the map if a Subscriber is actively trying to Lock() it to add a new connection. So when someone has access the other has to wait, until the lock is released.
Fixing the memory leak
If you spin this up right now, it works flawlessly until a client’s WiFi drops.
When a user disconnects, the OS kills their end of the network pipe. However, our Broker’s Hash Table still holds a pointer to that dead pipe! The next time someone publishes to that topic, the server will try to write to a closed socket, throwing an error. Over time, your RAM will fill up with dead connections.
We have to manually clear that reference. Using Go’s defer, we can trigger a cleanup routine the exact millisecond the TCP connection loop breaks, removing the dead socket out of our array. Once removed from the map, the connection becomes unreachable, allowing Go’s Garbage Collector to finally sweep it out of RAM.
func handleConnection(conn net.Conn, b *Broker) {
defer conn.Close()
var myTopics []string
// Triggered instantly when the client disconnects
defer func() {
for _, topic := range myTopics {
b.RemoveSubscriber(topic, conn)
}
}()
// ... (Read incoming bytes loop here) ...
}
Where do we go from here?
If you wanted to push this engine even further, you could build a Hybrid load balancer that mimics RabbitMQ by adding Worker Queues (Consumer Groups), allowing backend servers to share heavy task workloads using a round-robin algorithm. This is exactly what I’m proceeding with after this blog post. You can track in my repo - muque