Migrating from Resque to Sidekiq

Resque at Sharethrough

Updated 9/1/2015: We have updated our SlackErrorService to use Sidekiq’s global error handlers feature.

Back in 2013 when we began our platform rewrite, there were many Ruby-based queuing options available. The following three stood out as the best choices: Delayed Job, Resque, and Sidekiq. Delayed Job uses ActiveRecord and your database to store job metadata, while Resque and Sidekiq both use Redis. Resque’s architecture involves forking processes while Sidekiq uses Ruby threads. Due to its success at Github and ability to handle code that is not thread-safe, Resque was chosen as our message queue. In the past 2.5 years, Resque has served us well. It has scaled up with us as we’ve grown to accommodate more advertisers and publishers on our platform. However, lately it has fallen up short and resulted in us migrating all of our background workers to use Sidekiq.

After our recent switch over to use continuous delivery, we started to notice more and more Resque::TermException errors being raised. These were happening after every deploy, which could happen up to 20 times a day. It appeared that Resque wasn’t handling system level signals properly.

We tried to fix the situation by following Heroku’s advice of starting our workers with an additional environment variable introduced in Resque 1.22:

$ TERM_CHILD=1 QUEUES=* rake resque:work

and catching and re-enqueing errors:

1
2
3
4
5
6
7
class SharethroughJob
  def self.perform(object_id)
    ...
  rescue Resque::TermException
    SharethroughJob.enqueue(object_id)
  end
end

While this caught some errors, we still noticed errors on every deploy when upstart would restart our Resque workers. As we dug deeper, we noticed that the 1-x branch of Resque hadn’t received any significant commits in the last six months. A redesigned version, 2.0, is in active development, however, there are no releases on RubyGemsand not a lot of recent development. Instead of trying to patch an unmaintained project, we decided it could be worthwhile to reevaluate our queueing system.

Introducing Sidekiq

We took a look at the current Ruby message queue landscape and were surprised at how far Sidekiq had come in the past two years. It’s actively maintained, offers a paid pro version with extra features and support, and has some very comprehensive documentation. Sidekiq’s API is very similar to Resque so the transition would be pretty painless.

The following is a Resque job:

1
2
3
4
5
class SharethroughJob
  def self.perform(object_id)
    #do work here
  end
end

converted to a Sidekiq job would look like:

1
2
3
4
5
6
7
class SharethroughJob
  include Sidekiq::Worker

  def perform(object_id)
    #do work here
  end
end

We decided to de-risk our move to Sidekiq by first migrating our most frequently run and computationally intensive jobs first. If Sidekiq could handle these jobs then it could most definitely handle the rest.

Migrating from Resque to Sidekiq

Our Resque Setup

Our Resque setup made use of the resque-loner gem, which allowed us to only enqueue unique jobs. Additionally, we integrated with Airbrake to catch any errors, Librato for metrics, and Slack to notifications. This is what our Resque initializer looked like.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
require 'resque/failure/multiple'
require 'resque/failure/airbrake'
require 'resque/failure/redis'
require 'resque/failure/slack'

Resque.redis = Bakery::Application.config.redis
Resque.redis.auth(APP_CONFIG["redis_auth"]) if APP_CONFIG.has_key? "redis_auth"

Resque.after_fork = Proc.new do
  ActiveRecord::Base.establish_connection
  Sharethrough.establish_connection
end

Resque::Failure::Airbrake.configure do |config|
  config.api_key = APP_CONFIG['airbrake_api_key']
  config.secure = true
end

Resque::Failure::Slack.configure do |config|
  config.channel = 'eng-resque-errors'
  config.token = APP_CONFIG['slack_api_key']
  config.level = :compact
end

Resque::Failure::Multiple.classes = [Resque::Failure::Redis, Resque::Failure::Airbrake]
Resque::Failure::Multiple.classes << Resque::Failure::Slack if Rails.env.production?
Resque::Failure.backend = Resque::Failure::Multiple

To plug into Librato, we wrote some custom code to wrap all calls to the perform instance method of our jobs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
module Sharethrough
  module Job
    def self.included(klass)
      klass.extend(ClassMethods)
      klass.define_custom_error_class
    end

    module ClassMethods
      def self.queue
        :default
      end

      def perform(*args)
        instance = new
        instance.perform(*args)
      rescue => e
        ::Rails.logger.error(e.message)
        send_to_librato if Rails.env.production?
        raise wrap_error(e)
      end

      def send_to_librato
        librato.send_gauges({ librato_key => 1 })
      end

      def librato
        Sharethrough::Librato.new(
          APP_CONFIG['librato_username'],
          APP_CONFIG['librato_api_key']
        )
      end

      def librato_key
        "resque.#{queue.to_s}.errors"
      end

      def wrap_error(error)
        custom_error_class.new("#{error.class.name.demodulize} : #{error.inspect}").tap do |custom_error|
          custom_error.set_backtrace(error.backtrace)
        end
      end

      def custom_error_class
        [self.name, custom_error_classname].join('::').constantize
      end

      def custom_error_classname
        "#{self.name.demodulize}Error"
      end

      def define_custom_error_class
        self.const_set(custom_error_classname, Class.new(StandardError))
      end
    end
  end
end

Any job that needed to send metrics to Librato looked like this:

1
2
3
4
5
6
7
8
9
10
11
class SharethroughJob
  include Sharethrough::Job

  def self.queue
    :queue_name
  end

  def perform(object_id)
    # do work here
  end
end

Sidekiq Setup

To only enqueue unique jobs, we include the sidekiq-unique-jobs gem. This allows us to specify sidekiq_options unique: true for any job. By just including the Airbrake gem, it will automatically detect Sidekiq and catch any errors. There is a nice error handler api that we plug into to enable Librato.

We have also opted to take advantage of Sidekiq’s ability to retry a job a certain amount of times before it is considered “dead”. This is what our Sidekiq initializer looks like:

1
2
3
4
5
6
7
8
9
10
11
Sidekiq.configure_server do |config|
  config.redis = { url: APP_CONFIG['redis_url'] }
  config.error_handlers << Proc.new {|ex, ctx_hash| Sharethrough::Sidekiq::LibratoErrorService.notify(ex, ctx_hash) }
  config.error_handlers << Proc.new {|ex, ctx_hash| Sharethrough::Sidekiq::SlackErrorService.notify(ex, ctx_hash) }
end

Sidekiq.configure_client do |config|
  config.redis = { url: APP_CONFIG['redis_url'] }
end

Sidekiq.default_worker_options = { 'backtrace' => true, 'retry' => 3, 'unique' => true }

Our LibratoErrorService looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
module Sharethrough
  module Sidekiq
    class LibratoErrorService
      class << self
        def notify(exception, ctx_hash)
          send_to_librato(ctx_hash['queue'])
        end

        def send_to_librato(queue)
          librato.send_gauges("sidekiq.#{queue}.errors" => 1)
        end

        def librato
          Sharethrough::Librato.new(
            APP_CONFIG['librato_username'],
            APP_CONFIG['librato_api_key']
          )
        end
      end
    end
  end
end

Instead of being notified on Slack for every single error, we decided it would be better to be notified only if a job makes it into the dead queue. This occurs only after Sidekiq has exhausted its retries. This is what our SlackErrorService looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
module Sharethrough
  module Sidekiq
    class SlackErrorService
      class << self
        def notify(exception, ctx_hash)
          if ctx_hash['retry_count'] == 3 && Rails.env.production?
            text = ""
            text << "Message:     *#{ctx_hash['error_message']}*\n"
            text << "Job Class:   `#{ctx_hash['class']}`\n"
            text << "Error Class: `#{ctx_hash['error_class']}`\n"
            text << "Queue:       _#{ctx_hash['queue']}_\n"
            text << "Args:        #{ctx_hash['args'].inspect}"
            slack_client.chat_postMessage({channel: channel, text: text})
          end
        end

        def slack_client
          Slack::Client.new
        end

        def channel
          'eng-sidekiq-errors'
        end
      end
    end
  end
end

Additionally, we have information about our concurrency and queue priority defined in our sidekiq.yml file:

1
2
3
4
5
:concurrency: 5
:queues:
  - most_important_queue
  - second_most_important_queue
  - default

Deployment

At Sharethrough we use Capistrano to deploy our code and Upstart to manage processes on our Ubuntu servers. We define how to start our worker process in a Procfile:

1
worker: env bundle exec sidekiq

and execute a Capistrano task that converts this to Upstart scripts using the Foreman gem:

1
2
3
4
5
6
7
8
namespace :deploy do
  namespace :foreman do
    desc "Export the Procfile to upstart scripts"
    task :export, :roles => :worker, :on_no_matching_servers => :continue do
      run "cd #{release_path} && sudo bundle exec foreman export upstart /etc/init -a #{application} -u #{user} -l #{shared_path}/log  -f #{release_path}/Procfile -c worker=1"
    end
  end
end

The foreman gem will generate the appropriate upstart scripts for 1 worker process. That process will spawn 5 threads. To start our workers, all we need to do is execute service application_name start, and Upstart will take care of everything.

@mperham provides a more comprehensive Upstart script that he recommends – we are likely moving to something like this in the future.

Gotchas

The initial migration from Resque to Sidekiq was pretty painless. When we deployed to production there were quite a few gotchas we ran into. The Problems and Troubleshooting section of the Sidekiq wiki proved to be very valuable.

Sidekiq is Fast

We noticed quite a few ActiveRecord::RecordNotFound: Couldn't find Model with 'id'=1234 failures happening. We observed that jobs throwing this error would only fail once, retry, and then succeed. This was happening in models that had an after save callback that enqueued a job. Sidekiq is really, really fast. So fast that it will try and run a job before ActiveRecord has finished committing a database transaction when creating a model. We fixed this by using an after_commit hook, to ensure the database record is created before we try to execute a job that depends on it:

1
2
3
4
5
6
7
8
class Model
  after_commit :enqueue

  def enqueue(id)
    Jobs::SharethroughJob.perform_async(id)
  end

end

If there are no ActiveRecord models used, another approach would be to enqueue the job with a time delay: Jobs::SharethroughJob.perform_in(5.seconds, object_id)

Rails Eager Loading

Another error we started to see intermittently was NameError: uninitialized constant Sharethrough::DependencyClassForJob. Essentially, helper classes that were being used in our main worker class were not being loaded properly. This was due to the way Rails eager loads files. All our jobs are stored in the lib directory, so they were not automatically eager loaded. We explicitly required these files which looked a bit ugly:

1
2
3
4
5
6
7
8
9
require '../sharethrough/dependency_class_for_job'
require '../sharethrough/dependency2'
require '../sharethrough/dependency3'

module Jobs
  class SharethroughJob
    include Sidekiq::Worker
  end
end

We pushed our changes and closed out this bug. Shortly after we started to notice another error: RuntimeError: Circular dependency detected while autoloading constant Sharethrough::DependencyClassForJob. The class definition for Sharethrough::DependencyClassForJob in our worker class hadn’t completed loading so it triggers const_missing and ActiveSupport::Dependencies tries to load Sharethrough::DependencyClassForJob and sees that it’s already loaded and raises the dependency error.

After taking a couple steps back and trying to understand why certain code wasn’t being loaded properly, we stumbled on a section in the Sidekiq docs that explains why Sidekiq doesn’t autoload my Rails application code.

Eager loading loads your code only once when starting the server — it does not reload your code between jobs. If you have code located outside of the standard Rails directories (e.g. app/models, app/controllers, etc), Sidekiq will not see it unless you add the path to eagerloadpaths in your Rails application configuration.

We could remove the require statements and add the following to our application.rb:

1
2
3
4
config.eager_load_paths += %W(
      #{config.root}/lib/jobs
      #{config.root}/lib/sharethrough
    )

Note, if you previously included these directories as part of your config.autoload_paths, this is no longer necessary as Rails checks for eager loaded paths.

ActiveRecord Connection Pooling

We use MySQL as our database and have a max connection pool setting of 5. Here is a snippet from our database.yml:

1
2
3
4
defaults: &defaults
  adapter: mysql2
  encoding: utf8
  pool: 5

This means that for every Rails process (both web and worker), there will be a limit of 5 connections to the database at any given time. We decided to roll out Sidekiq with a concurrency of 5 threads, so each thread would be able to obtain a connection to the database(by default Sidekiq starts with 25 threads).

We noticed the following intermittent errors: ActiveRecord::ConnectionTimeoutError: could not obtain a database connection within 5.000 seconds (waited 5.000 seconds). This didn’t make any sense because theoretically each thread could have its own dedicated connection to the database. Taking a peek at the database, it appeared that Sidekiq was occasionally leaking connections. We decided mitigate this problem by wrapping any calls to the database in the thread-safe method ActiveRecord::Base.connection_pool.with_connection. Any job that needs to connect to ActiveRecord now inherits from:

1
2
3
4
5
6
7
8
9
10
11
12
module Jobs
  class BaseActiveRecordJob
    def perform(*args)
      ActiveRecord::Base.connection_pool.with_connection do
        perform_with_ar_connection(*args)
      end
    ensure
      ActiveRecord::Base.clear_active_connections!
      ActiveRecord::Base.connection.close
    end
  end
end

Our new job looks like:

1
2
3
4
5
6
7
8
9
module Jobs
  class SharethroughJob < BaseActiveRecordJob
    include Sidekiq::Worker

    def perform_with_ar_connection(object_id)
      #do work touching the db
    end
  end
end

Future Improvements

Now that all our jobs are moved over to Sidekiq and everything is stable, we would like to experiment with optimizing our use of Sidekiq. We are in the process of experimenting with increasing our concurrency. Additionally, we would like to run more Sidekiq processes on our worker boxes.

We are curious to assess Sidekiq’s Enterprise version, Sidekiq Enterprise which offers additional features, such as improved reliability, batch jobs, and even support.

We’re going to try and port over our jobs logic to use the new ActiveJob API, so that hopefully in the future swapping out message queue backends will be less painful.