Dustin Sallings home
poolin' gophers

Pooling in Go

A few people in irc have asked how to build an object pool. I’ve had a need for this in the past and came up with a design I rather like, so I’ve walked people through the code a few times.

Since I’m at Gophercon and I had a similar discussion today, I figured I’d write up a detailed description of how my code works, how it progressed and why it is the way it is now.

Channels as a Pool

You can think of a buffered channel in go as a thread-safe queue. It also has this nice property of blocking if it’s too full or too empty.

Channels practically do all the work for you.

For example, the core of the first version of the pool I wrote for go-couchbase follows. cp.connections is a buffered channel of *memcached.Client. cp.mkConn will build a new connection whenever one is needed.

func (cp *connectionPool) Get() (*memcached.Client, error) {
	select {
	case rv := <-cp.connections:
		// Existing connection
		return rv, nil
	case <-time.After(time.Millisecond):
		// No existing connection, let's make one
		return cp.mkConn(cp.host, cp.name)
	}
}

func (cp *connectionPool) Return(c *memcached.Client) {
	select {
	case cp.connections <- c:
	default:
		// Overflow connection
		c.Close()
	}
}

The first time you try to get a connection from this pool, the channel is empty, so it hits the delay path, opens a connection, and then returns the newly created connection. While that’s in use, a second would also do the same thing.

Returning a connection is simply a non-blocking send back into the channel. The reason it’s non-blocking is subtle, but should hopefully be obvious – if there’s space in the channel, we keep the connection. If there’s no space, it was overflow and more than we want to keep in the pool, so we immediately close it.

Problems?

This served me well for quite a while, but one of my users had an application in which hundreds of goroutines simultaneously wanted to perform a quick DB operation. All of them timed out instantly, all of them opened new connections and then all but the pool size (~5?) closed them because they were overflow.

So how can we limit the total number of connections? I dreaded fixing this problem for a long time. I was thinking about having a counter for the number of connections outstanding, which would need a lock, and then a condition for notifying waiters availability of space whenever a new slot became available and then loop. But mixing that with the existing channel op would be painfully tedious.

Turns out, you just need another channel to control connections. This brings us to go’s most amazing feature: select. This issue quickly degraded from something I was trying to push back to the application author to work around to basically the following code:

func (cp *connectionPool) Get() (rv *memcached.Client, err error) {
	// Try to grab an available connection within 1ms
	select {
	case rv := <-cp.connections:
		return rv, nil
	case <-time.After(time.Millisecond):
		// No connection came around in time, let's see
		// whether we can get one or build a new one first.
		select {
		case rv := <-cp.connections:
			return rv, nil
		case cp.createsem <- true:
			// Room to make a connection
			rv, err := cp.mkConn(cp.host, cp.auth)
			if err != nil {
				// On error, release our create hold
				<-cp.createsem
			}
			return rv, err
		}
	}
}


func (cp *connectionPool) Return(c *memcached.Client) {
	select {
	case cp.connections <- c:
	default:
		// Overflow connection.
		<-cp.createsem
		c.Close()
	}
}

Notice the new channel cp.createsem which is the semaphore we use for opening connections. The buffer size controls how many total connections we can possibly have outstanding. Each time we establish a new connection, we place an item in that channel. When we close a connection, we remove it again.

Return is roughly the same, save the overflow semaphore management.

The Get method is trickier. There’s a select block nested within another select block. The outer one only attempts to wait for an existing connection. If one becomes available relatively soon (one millisecond), we just use it. It’s generally better to use an existing connection than open a new one.

If the pool wait times out, we go into the inner select. It should be noted that this is similar, but is different in an important way. We still wait for an available connection, but we also wait for the ability to create a new connection. Waiting for this connection semaphore in the outer select block would cause us to open connections prematurely.

Rolling them into a single select is perfectly valid if the objects and their creation are cheap, but when they’re not, it’s easy enough to prefer reuse over creation as I’ve done here.

Final Version

The above shows an approximate evolution of my pool. Check out the production code if you want to see the final hardened version. There are a few differences between the above code code and what’s in production:

  1. nil receivers are supported (will close a connection if there’s no pool)
  2. returning nil is supported (does nothing)
  3. there are health checks on the connection objects
  4. some tracing exists to help understand which path was used to establish connections.
  5. connection pool shutdown is supported via closing and clearing the connection channel
  6. there’s a short-circuit connection pool that avoids allocating a timer
  7. there’s always an overall timeout on acquiring a connection

Otherwise, the theory of operation is as described above.

Also, for a good time, notice that I’ve got 100% test coverage on this pool. That’s really quite nice considering how much crap it has to deal with.

Bonus: (*connectionPool).Close()

This is only mildly related, but I wanted to describe the connection pool shutdown mechanism. It’s slightly clever and super dense, but I’m keeping it.

The purpose of this method is to close all connections available in the pool and signal to anything waiting for the pool that it’s closed and should error immediately (these are all the errPoolClosed paths you can see in the code).

This is the code in its entirety:

func (cp *connectionPool) Close() (err error) {
	defer func() { err, _ = recover().(error) }()
	close(cp.connections)
	for c := range cp.connections {
		c.Close()
	}
	return
}

So, ignore the defer for a second. We close the connections channel and then iterate the channel closing each connection within it. Then we return. The return is naked, so it’s just going to return the current value of err, which is nil (could also return nil – makes no difference here).

Pretty straightforward.

OK, now stop ignoring the defer. Why is that there? Well, if you call Close twice for some reason, the channel close will panic. This recover will catch that panic and convert it to an error and store it in err, the method’s return value. The first call returns nil and the second returns an error. Admittedly, it’s not an awesome error that is obvious what you’ve done wrong, but it doesn’t panic.

But what happens in the first call case? In the first call, recover returns something that isn’t an error (nil). We do a two-value type assertion which returns a nil value and false. Then we assign this new nil to our err return variable. The method returns nil, but for a different reason.

This method actually just returns whatever the first result of a two-result type assertion on the recover is.


blog comments powered by Disqus