Golang patterns for serving on-demand, generated content

We’re using Go for some parts of the Gitorious project and one of the recently open sourced sub-projects written in Go is git-archive-daemon. It’s a scalable, high-performance HTTP API for serving archives of git repositories.

While implementing this tool we noticed several apparent patterns emerge which are not specific to git archive generation and can be applied to other use cases. For example on-demand image or sound generation, or any other kind of synchronous generation where a client waits on the connection for the response can benefit from these patterns.

This tutorial will go through these patterns, starting with the simple, naive implementation, gradually making it smarter and more powerful.

Note: All the code examples below lack proper error handling. This is on purpose, to make the examples shorter and easier to understand. Feel free to browse through the source of git-archive-daemon to see how we handle errors there.

Generating files on demand – the naive way

While the following techniques can be applied to any type of generated content let’s use file generation as an example.

We’ll be requesting a file by its “virtual filename” using query parameter filename. The simplest way of generating a file on demand and serving it could be this:

func main() {
    http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
        filename := req.URL.Query().Get("filename")
        path := workHard(filename) // takes "long" time
        http.ServeFile(w, req, path)
    })
    log.Fatal(http.ListenAndServe(":5000", nil))
}

workHard function does the actual work and returns a path on disk to the generated file. It doesn’t matter how this function is implemented. We know it takes time though. ServeFile from net/http package sends the file back to the client. Looks simple and it works… until you get lots of simultaneous requests.

Note, that ListenAndServe starts a new goroutine for each incoming request. This is desired in most cases and it allows handling hundreds of thousands of simultaneous connections which is great when response generation is relatively light and fast. However, in this case where we’re dealing with CPU heavy or I/O heavy task it can easily lead to DOS-ing the server.

Pattern 1: Processing requests with a worker pool

In order to limit the amount of heavy work that is executed to fulfill the incoming requests we can use a pool of workers. By having a fixed size pool we can ensure that the amount of work happening at any given time doesn’t exceed a specified threshold.

Let’s introduce Request struct and a worker pool:

type Request struct {
    HttpRequest *http.Request
    ResponseWriter http.ResponseWriter
}

func WorkerPool(n int) chan *Request {
    requests := make(chan *Request)

    for i:=0; i<n; i++ {
        go Worker(requests)
    }

    return requests
}

func Worker(requests chan *Request) {
    for request := range requests {
        filename := request.HttpRequest.URL.Query().Get("filename")
        path := workHard(filename)
        http.ServeFile(request.ResponseWriter, request.HttpRequest, path)
    }
}

WorkerPool creates and returns a channel on which it receives the incoming requests to process. Every worker reads new requests from this channel, does the heavy lifting, and finally serves the resulting file back to the client.

Let’s create a Server struct that implements http.Handler interface and connect everything together in main func:

type Server struct {
    Requests chan *Request
}

func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    s.Requests <- &Request{HttpRequest: req, ResponseWriter: w}
}

func main() {
    requests := WorkerPool(5)
    server := &Server{Requests: requests}
    log.Fatal(http.ListenAndServe(":5000", server))
}

Request handling logic is trivial – it just sends every request over the channel and the real request processing happens in the worker pool.

But it doesn’t work. You get empty response body for every request. Why? There is a gotcha here. When ServeHTTP returns, ListenAndServe closes the network connection immediately, so we need to block ServeHTTP until we have response ready.

Let’s fix the code then. First, Request struct:

type Request struct {
    Filename string // changed
    ResultChan chan string // changed
}

Instead of having http.Request and http.ResponseWriter in our Request struct we have Filename, the only information from the request we need to do the job, and ResultChan, a channel from which we’ll read the job result (a path on disk) when it’s ready.

We also need to update ServeHTTP to receive the result from this channel:

func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    filename := req.URL.Query().Get("filename")
    request := &Request{Filename: filename, ResultChan: make(chan string)}
    requests <- request
    path := <-request.ResultChan // this blocks
    http.ServeFile(w, req, path)
}

The last thing to do is to update the worker to send the result over the ResultChan instead of directly sending the response to the client:

func Worker(requests chan *Request) {
    for request := range requests {
        path := workHard(request.Filename)
        request.ResultChan <- path // changed
    }
}

This is it. It works as expected now, properly sending the responses back to the clients while limiting the amount of work in progress.

Note that alternatively you can limit the number of requests using a tool like Haproxy. However doing this the way explained above is more flexible, as we’ll see in the next section.

Pattern 2: Request grouping

Let’s assume the following:

  • Some of the requests are exactly the same (they trigger the same work and response)
  • Server gets hundreds of requests per second
  • Generating a response takes 5+ seconds

With the current implementation we pass every new request onto the processing queue (requests channel), whether there is a response being currently generated for similar request or not. This is not ideal because we’re doing the same work multiple times. That is a waste of resources. Let’s fix that.

We’ll update our implementation to group similar requests together and send the response to all of the related clients immediately once the work is done.

To do that let’s first make a distinction between a “request” and a “job”. We introduce Job, a struct that holds a filename (a job payload), and the result. We also embed Job inside of a Request:

type Job struct {
    Filename string
    Result string
}

type Request {
    Job *Job // changed
    ResultChan chan string
}

Workers will now operate on Job structs, reading new jobs from the jobs channel and sending processed jobs to the results channel:

func WorkerPool(n int) (chan *Job, chan *Job) {
    jobs := make(chan *Job)
    results := make(chan *Job)

    for i:=0; i<n; i++ {
        go Worker(jobs, results)
    }

    return jobs, results
}

func Worker(jobs chan *Job, results chan *Job) {
    for job := range jobs {
        job.Result = workHard(job.Filename)
        results <- job
    }
}

Next, we need to group the requests by their jobs’ payload (Filename). Let’s have a “request queue” (a slice of Requests) for each unique filename and make sure we only schedule a single job for a queue.

This logic will live in a function named RequestMux as it effectively multiplexes requests over a pool of workers. Here is how it looks:

func RequestMux(jobs chan *Job, results chan *Job) chan *Request {
    requests := make(chan *Request)

    go func() {
        queues := make(map[string][]*Request)

        for {
            select {
            case request := <-requests:
                job := request.Job
                queues[job.Filename] = append(queues[job.Filename], request)

                if len(queues[job.Filename]) == 1 { // the one we appended is the first one
                    go func() {
                        jobs <- job
                    }()
                }

            case job := <-results:
                for _, request := range queues[job.Filename] {
                    request.ResultChan <- job.Result
                }

                delete(queues, job.Filename)
            }
        }
    }()

    return requests
}

Let’s focus on the goroutine that gets spawned in this function. It does the following things in a loop:

  • processes incoming request, adding it to a queue identified by job’s filename,
  • sends a job to a worker pool for every new created queue,
  • handles processed job, sending the result to each queued request’s ResultChan.

Finally it returns the requests channel so server can send new requests to it. We need to modify main function to reflect the changes we did to the worker pool and to wire up RequestMux:

func main() {
    jobs, results := WorkerPool(5) // changed
    requests := RequestMux(jobs, results) // added
    server := &Server{Requests: requests}
    log.Fatal(http.ListenAndServe(":5000", server))
}

It is a pretty powerful server now, but there is still one more optimization we can do.

Pattern 3: Caching job results

Request grouping prevents us from doing the same work in parallel when similar requests come in a relatively short period of time. However we still repeat the same work when we get similar requests sequentially rather than simultaneously.

Sounds like a job for a cache. Let’s update main function first for a change:

func main() {
    jobs, results := WorkerPool(5)
    jobs, results = Cache(jobs, results) // added
    requests := RequestMux(jobs, results)
    server := &Server{Requests: requests}
    log.Fatal(http.ListenAndServe(":5000", server))
}

Notice that we “hijack” jobs and results variables! We use a Cache function that gets original jobs and requests channels and returns a new pair, which we pass to RequestMux. This nicely shows how you can easily compose different pieces of code using channels in Go.

Example Cache function looks like this:

func Cache(upstreamJobs chan *Job, upstreamResults chan *Job) (chan *Job, chan *Job) {
    jobs := make(chan *Job)
    results := make(chan *Job)

    go func() {
        for {
            select {
            case job := <-jobs:
                cachedPath := "/cache/" + job.Filename
                if isCached(cachedPath) { // cache hit
                    job.Result = cachedPath
                    results <- job
                } else { // cache miss
                    upstreamJobs <- job
                }

            case job := <-upstreamResults:
                cachedPath := "/cache/" + job.Filename
                moveFile(job.Result, cachedPath)
                job.Result = cachedPath
                results <- job
            }
        }
    }()

    return jobs, results
}

The idea here is to receive new jobs from the jobs channel and check if there is a cached result. If it’s there we immediately pass the result on the results channel. Otherwise we send the job to the original jobs channel (locally named upstreamJobs). When the real job is processed we get it from the original results channel (locally named upstreamResults), cache the result and send it back over the new results channel.

Note that you can apply this technique to any type of work pipeline that involves input (jobs) and output (results) channels.

Conclusion

We hope you enjoyed this overview of the patterns we found very useful. Go is a great language for solving this type of problems and we’re very happy we have it now in our arsenal at Gitorious.

Follow

Get every new post delivered to your Inbox.

Join 846 other followers

%d bloggers like this: