Array-Based Queue on Aerospike

21 March 2015

Aerospike is a high-performance, horizontally scalable key-value store. Running 1M transactions per second without breaking a sweat, this NoSQL database is delivering 10-15x price-performance gains. Notably, through flash optimizations, RAM-like performance can be achieved using local SSDs. Further, Aerospike operates as an AP data store.

Similar to Redis, Aerospike provides built-in support for complex data types such as lists and maps. If you a migrating a Redis based application for nice performance rewards, you can conveniently use a complete Redis veneer. Note, when storing built-in Aerospike data structures in records, you are bound by the typical record size limitation of 1MB. The Large Data Type (LDT) Lua extensions, further, provide support for ordered lists, sets, stacks, and maps. Using sub-records, LDTs may grow freely, only limited by the underlying storage.

Through User Defined Function (UDF) modules, data structures can be managed directly within Aerospike on the server. The Redis veneer above provides a great example of this record management from which this UDF module is based on. To enable runtime characteristics similar to that of a memory resident linked list in Redis, you can implement efficient queuing using two array list buffers. Let's begin with the enqueue User Defined Function (UDF) in Lua. NOTE: For a more scalable approach with similar runtime properties, please see the aerospike-LDT-techniques repo.

function enqueue(rec, bin, value)
    local q = rec[bin]
    if q == nil then
        q = map { size = 0, pos = 0 }
    end

    if q.rear == nil then
        q.rear = list()
    end
    list.append(q.rear, value)
    q.size = q.size + 1

    rec[bin] = q
    l_update(rec)
    return q.size
end

The UDF above participates in the main flow of the transaction. This method sets up a map to manage the queue data structure if it does not already exist. Subsequently, it appends the incoming value to a rear array list buffer before updating the record with the call to the local helper function l_update. Next, let's take a look at the work of our dequeuing function which manages our second buffer.

if q.front == nil then
    q.front = q.rear
    q.pos = 1
    map.remove(q, "rear")
end

local item = nil
local last = list.size(q.front)

if last == 1 then
    item = q.front[1]
    map.remove(q, "front")
else
    if q.pos < last then
        item = q.front[q.pos]
        q.front[q.pos] = q.front[last]
        q.pos = q.pos + 1
    else
        item = q.front[last]
    end
    list.remove(q.front, last)
end
q.size = q.size - 1
return item

The dequeuing logic above manages the front buffer of the queue. If no front buffer exists, it will move the rear buffer to the front. As we are using array lists for our buffers, it would be a linear operation to dequeue items from the beginning of the list since all subsequent items need to be shifted left. By maintaining a position in our queue, we can dequeue more efficiently. As items are dequeued, we iteratively reverse the list by copying the item currently in the last position into the old index of the item being dequeued. Once we hit the end of the list, we dequeue from the last position until the front buffer is empty.

The complete queue.lua UDF module is available here. To install, use the udf-put command below.

ascli udf-put queue.lua

To work with the queue through the aql console, it can be called with the execute command.

aql> execute queue.enqueue('Q', 99) on demo.queues where pk='q'
+---------+
| enqueue |
+---------+
| 1       |
+---------+

aql> execute queue.dequeue('Q') on demo.queues where pk='q'
+---------+
| dequeue |
+---------+
| 99      |
+---------+

Using two buffers, we can achieve runtime characteristics similar to that of a linked list. As this queue example stores the data structure within a single Aerospike record, it is bound by the aforementioned record storage limits.

References

Thanks to Peter Milne for creating the excellent Redis veneer article which this UDF module is based on. Additionally, please see Peter's aerospike-LDT-techniques repo for FIFO utilizing LDTs. As opposed to the in-record storage limitations discussed above, the LDT approach scales queueing by utilizing a LLIST as a map with counters positioned at the head and tail.

By Aaron Dunnington

HTTP Request Context in Golang

7 January 2015

The Go HTTP stack offers a very efficient web server implementation. As the Request object passed through to Handlers does not support memory resident storage itself, context specific to the lifetime of each request needs to be managed outside net/http.

Go web frameworks such as Goji, Beego, and Revel have built-in support for request context. Further, the Gorilla context package provides support for global request variables, though it relies on mutex to guard access to request state. If you are running directly on net/http, you can also inject a context object at the beginning of the request lifecycle which flows through the handler chain. By passing context through, rather than relying a global context object, state specific to each request can be managed without introducing locks.

type Handler func(http.ResponseWriter, *http.Request, *Context)

func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    ctx := &Context{
        Config: Config,
        DB:     DB,
        Items:  map[string]interface{}{},
    }
    h(w, r, ctx)
}

To build a chain of Handlers which accept context, the creation of the custom Handler type above can be wrapped. The outer Handler returned will contain a reference to the Handler chain passed in. Upon execution by the networking stack, the outer Handler's ServeHTTP method will be invoked, creating the custom Context object before forwarding to the entire handler chain.

func Csrf(h Handler) Handler {
    return Handler(
        func(w http.ResponseWriter, r *http.Request, ctx *Context) {

            // Your super secure CSRF logic here...

            ctx.Items["csrf"] = token
            h(w, r, ctx)
        })
}

As Handler implements the ServeHTTP method, the first handler instance in the chain can be registered with a given pattern on your router.

func home(w http.ResponseWriter, r *http.Request, ctx *Context) {
    Render("home.tmpl", ctx.Items["csrf"])
}
router.Handle("/", Csrf(Handler(home)))

At runtime, the Csrf handler executes first, calling ServeHTTP to setup the request environment. Next, the Csrf handler itself is called which sets a token within the Context. Finally, the home Handler is invoked wherein request context state is made available.

By Aaron Dunnington

Autoscale Resque Workers on Heroku

27 November 2011

A common technique for autoscaling Resque workers on Heroku uses the after_enqueue and after_perform hooks that run within each Resque worker to adjust the current worker scale up and down, respectively. The after_perform hook watches the pending job count. When no jobs remain, this hook will scale all workers down.

When scaling workers up, the Heroku API is additive; thus, additional workers can be scaled while others are actively processing jobs. As the Heroku API does not currently allow individually specified workers to scale down, however, it is necessary to wait until all workers are inactive before performing a scale down operation.

While the after_perform scale down approach mitigates against shutting an active worker down while pending jobs exist, a race condition can occur where the pending job count hits zero, and a scale down operation is invoked on the Heroku API prior to a subsequent job being enqueued from the app and reserved on a worker which will be shutdown in the current scale operation.

Recently, I have been working on the resque-heroku-scaler gem which autoscales Resque workers through a separate monitor process. The scaler monitor process polls for pending jobs against the specified Resque Redis backend. When scaling down, this scaler process locks each worker from attempting to reserve any future pending jobs before a scale down operation is performed.

To get started, include the scaler tasks in lib/tasks/scaler.rake

require 'resque/tasks'
require 'resque/plugins/heroku_scaler/tasks'

task "resque:setup" => :environment

In your Procfile, configure the scaler process using:

scaler: bundle exec rake resque:heroku_scaler

To run the scaler process, use the following command.

heroku scale scaler=1

Require the worker extensions within the app running your workers. For example, in lib/tasks/resque.rake. These extensions to the worker run loop poll for a shared flag within the Redis backend set by the scaler process which indicates the worker should enter an inactive scaling state. After a worker has entered this scaling state, it will cease attempting to check for incoming pending jobs. Once all workers have indicated they are inactive, the scaler process issues the scale down operation using the Heroku API.

require 'resque/tasks'

task "resque:setup" => :environment do
    require 'resque-heroku-scaler'
    ENV['QUEUE'] = '*'
end

In your development environment, the scaler process can run local worker processes using the rush library. To configure, use the following in your scaler rake file.

require 'resque/tasks'
require 'resque/plugins/heroku_scaler/tasks'

task "resque:setup" => :environment do
    if Rails.env.development?
        require 'resque-heroku-scaler'
        ENV["RUSH_PATH"] ||= File.expand_path('/path/to/app', __FILE__)
        Resque::Plugins::HerokuScaler.configure do |c|
            c.scale_manager = :local
        end
    end
end

By Aaron Dunnington

See the index for more articles.