Migrating from Resque to Sidekiq
Resque at Sharethrough
Updated 9/1/2015: We have updated our SlackErrorService
to use Sidekiq’s global error handlers feature.
Back in 2013 when we began our platform rewrite, there were many Ruby-based queuing options available. The following three stood out as the best choices: Delayed Job, Resque, and Sidekiq. Delayed Job uses ActiveRecord and your database to store job metadata, while Resque and Sidekiq both use Redis. Resque’s architecture involves forking processes while Sidekiq uses Ruby threads. Due to its success at Github and ability to handle code that is not thread-safe, Resque was chosen as our message queue. In the past 2.5 years, Resque has served us well. It has scaled up with us as we’ve grown to accommodate more advertisers and publishers on our platform. However, lately it has fallen up short and resulted in us migrating all of our background workers to use Sidekiq.
After our recent switch over to use continuous delivery, we started to notice more and more Resque::TermException
errors being raised. These were happening after every deploy, which could happen up to 20 times a day. It appeared that Resque wasn’t handling system level signals properly.
We tried to fix the situation by following Heroku’s advice of starting our workers with an additional environment variable introduced in Resque 1.22:
$ TERM_CHILD=1 QUEUES=* rake resque:work
and catching and re-enqueing errors:
1
2
3
4
5
6
7
class SharethroughJob
def self.perform(object_id)
...
rescue Resque::TermException
SharethroughJob.enqueue(object_id)
end
end
While this caught some errors, we still noticed errors on every deploy when upstart would restart our Resque workers. As we dug deeper, we noticed that the 1-x branch of Resque hadn’t received any significant commits in the last six months. A redesigned version, 2.0, is in active development, however, there are no releases on RubyGemsand not a lot of recent development. Instead of trying to patch an unmaintained project, we decided it could be worthwhile to reevaluate our queueing system.
Introducing Sidekiq
We took a look at the current Ruby message queue landscape and were surprised at how far Sidekiq had come in the past two years. It’s actively maintained, offers a paid pro version with extra features and support, and has some very comprehensive documentation. Sidekiq’s API is very similar to Resque so the transition would be pretty painless.
The following is a Resque job:
1
2
3
4
5
class SharethroughJob
def self.perform(object_id)
#do work here
end
end
converted to a Sidekiq job would look like:
1
2
3
4
5
6
7
class SharethroughJob
include Sidekiq::Worker
def perform(object_id)
#do work here
end
end
We decided to de-risk our move to Sidekiq by first migrating our most frequently run and computationally intensive jobs first. If Sidekiq could handle these jobs then it could most definitely handle the rest.
Migrating from Resque to Sidekiq
Our Resque Setup
Our Resque setup made use of the resque-loner
gem, which allowed us to only enqueue unique jobs. Additionally, we integrated with Airbrake to catch any errors, Librato for metrics, and Slack to notifications. This is what our Resque initializer looked like.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
require 'resque/failure/multiple'
require 'resque/failure/airbrake'
require 'resque/failure/redis'
require 'resque/failure/slack'
Resque.redis = Bakery::Application.config.redis
Resque.redis.auth(APP_CONFIG["redis_auth"]) if APP_CONFIG.has_key? "redis_auth"
Resque.after_fork = Proc.new do
ActiveRecord::Base.establish_connection
Sharethrough.establish_connection
end
Resque::Failure::Airbrake.configure do |config|
config.api_key = APP_CONFIG['airbrake_api_key']
config.secure = true
end
Resque::Failure::Slack.configure do |config|
config.channel = 'eng-resque-errors'
config.token = APP_CONFIG['slack_api_key']
config.level = :compact
end
Resque::Failure::Multiple.classes = [Resque::Failure::Redis, Resque::Failure::Airbrake]
Resque::Failure::Multiple.classes << Resque::Failure::Slack if Rails.env.production?
Resque::Failure.backend = Resque::Failure::Multiple
To plug into Librato, we wrote some custom code to wrap all calls to the perform
instance method of our jobs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
module Sharethrough
module Job
def self.included(klass)
klass.extend(ClassMethods)
klass.define_custom_error_class
end
module ClassMethods
def self.queue
:default
end
def perform(*args)
instance = new
instance.perform(*args)
rescue => e
::Rails.logger.error(e.message)
send_to_librato if Rails.env.production?
raise wrap_error(e)
end
def send_to_librato
librato.send_gauges({ librato_key => 1 })
end
def librato
Sharethrough::Librato.new(
APP_CONFIG['librato_username'],
APP_CONFIG['librato_api_key']
)
end
def librato_key
"resque.#{queue.to_s}.errors"
end
def wrap_error(error)
custom_error_class.new("#{error.class.name.demodulize} : #{error.inspect}").tap do |custom_error|
custom_error.set_backtrace(error.backtrace)
end
end
def custom_error_class
[self.name, custom_error_classname].join('::').constantize
end
def custom_error_classname
"#{self.name.demodulize}Error"
end
def define_custom_error_class
self.const_set(custom_error_classname, Class.new(StandardError))
end
end
end
end
Any job that needed to send metrics to Librato looked like this:
1
2
3
4
5
6
7
8
9
10
11
class SharethroughJob
include Sharethrough::Job
def self.queue
:queue_name
end
def perform(object_id)
# do work here
end
end
Sidekiq Setup
To only enqueue unique jobs, we include the sidekiq-unique-jobs gem. This allows us to specify sidekiq_options unique: true
for any job. By just including the Airbrake gem, it will automatically detect Sidekiq and catch any errors. There is a nice error handler api that we plug into to enable Librato.
We have also opted to take advantage of Sidekiq’s ability to retry a job a certain amount of times before it is considered “dead”. This is what our Sidekiq initializer looks like:
1
2
3
4
5
6
7
8
9
10
11
Sidekiq.configure_server do |config|
config.redis = { url: APP_CONFIG['redis_url'] }
config.error_handlers << Proc.new {|ex, ctx_hash| Sharethrough::Sidekiq::LibratoErrorService.notify(ex, ctx_hash) }
config.error_handlers << Proc.new {|ex, ctx_hash| Sharethrough::Sidekiq::SlackErrorService.notify(ex, ctx_hash) }
end
Sidekiq.configure_client do |config|
config.redis = { url: APP_CONFIG['redis_url'] }
end
Sidekiq.default_worker_options = { 'backtrace' => true, 'retry' => 3, 'unique' => true }
Our LibratoErrorService
looks like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
module Sharethrough
module Sidekiq
class LibratoErrorService
class << self
def notify(exception, ctx_hash)
send_to_librato(ctx_hash['queue'])
end
def send_to_librato(queue)
librato.send_gauges("sidekiq.#{queue}.errors" => 1)
end
def librato
Sharethrough::Librato.new(
APP_CONFIG['librato_username'],
APP_CONFIG['librato_api_key']
)
end
end
end
end
end
Instead of being notified on Slack for every single error, we decided it would be better to be notified only if a job makes it into the dead queue. This occurs only after Sidekiq has exhausted its retries. This is what our SlackErrorService
looks like:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
module Sharethrough
module Sidekiq
class SlackErrorService
class << self
def notify(exception, ctx_hash)
if ctx_hash['retry_count'] == 3 && Rails.env.production?
text = ""
text << "Message: *#{ctx_hash['error_message']}*\n"
text << "Job Class: `#{ctx_hash['class']}`\n"
text << "Error Class: `#{ctx_hash['error_class']}`\n"
text << "Queue: _#{ctx_hash['queue']}_\n"
text << "Args: #{ctx_hash['args'].inspect}"
slack_client.chat_postMessage({channel: channel, text: text})
end
end
def slack_client
Slack::Client.new
end
def channel
'eng-sidekiq-errors'
end
end
end
end
end
Additionally, we have information about our concurrency and queue priority defined in our sidekiq.yml
file:
1
2
3
4
5
:concurrency: 5
:queues:
- most_important_queue
- second_most_important_queue
- default
Deployment
At Sharethrough we use Capistrano to deploy our code and Upstart to manage processes on our Ubuntu servers. We define how to start our worker process in a Procfile:
1
worker: env bundle exec sidekiq
and execute a Capistrano task that converts this to Upstart scripts using the Foreman gem:
1
2
3
4
5
6
7
8
namespace :deploy do
namespace :foreman do
desc "Export the Procfile to upstart scripts"
task :export, :roles => :worker, :on_no_matching_servers => :continue do
run "cd #{release_path} && sudo bundle exec foreman export upstart /etc/init -a #{application} -u #{user} -l #{shared_path}/log -f #{release_path}/Procfile -c worker=1"
end
end
end
The foreman gem will generate the appropriate upstart scripts for 1 worker process. That process will spawn 5 threads. To start our workers, all we need to do is execute service application_name start
, and Upstart will take care of everything.
@mperham provides a more comprehensive Upstart script that he recommends – we are likely moving to something like this in the future.
Gotchas
The initial migration from Resque to Sidekiq was pretty painless. When we deployed to production there were quite a few gotchas we ran into. The Problems and Troubleshooting section of the Sidekiq wiki proved to be very valuable.
Sidekiq is Fast
We noticed quite a few ActiveRecord::RecordNotFound: Couldn't find Model with 'id'=1234
failures happening. We observed that jobs throwing this error would only fail once, retry, and then succeed. This was happening in models that had an after save callback that enqueued a job. Sidekiq is really, really fast. So fast that it will try and run a job before ActiveRecord has finished committing a database transaction when creating a model. We fixed this by using an after_commit hook, to ensure the database record is created before we try to execute a job that depends on it:
1
2
3
4
5
6
7
8
class Model
after_commit :enqueue
def enqueue(id)
Jobs::SharethroughJob.perform_async(id)
end
end
If there are no ActiveRecord models used, another approach would be to enqueue the job with a time delay: Jobs::SharethroughJob.perform_in(5.seconds, object_id)
Rails Eager Loading
Another error we started to see intermittently was NameError: uninitialized constant Sharethrough::DependencyClassForJob
. Essentially, helper classes that were being used in our main worker class were not being loaded properly. This was due to the way Rails eager loads files. All our jobs are stored in the lib
directory, so they were not automatically eager loaded. We explicitly required these files which looked a bit ugly:
1
2
3
4
5
6
7
8
9
require '../sharethrough/dependency_class_for_job'
require '../sharethrough/dependency2'
require '../sharethrough/dependency3'
module Jobs
class SharethroughJob
include Sidekiq::Worker
end
end
We pushed our changes and closed out this bug. Shortly after we started to notice another error: RuntimeError: Circular dependency detected while autoloading constant Sharethrough::DependencyClassForJob
. The class definition for Sharethrough::DependencyClassForJob
in our worker class hadn’t completed loading so it triggers const_missing
and ActiveSupport::Dependencies
tries to load Sharethrough::DependencyClassForJob
and sees that it’s already loaded and raises the dependency error.
After taking a couple steps back and trying to understand why certain code wasn’t being loaded properly, we stumbled on a section in the Sidekiq docs that explains why Sidekiq doesn’t autoload my Rails application code.
Eager loading loads your code only once when starting the server — it does not reload your code between jobs. If you have code located outside of the standard Rails directories (e.g. app/models, app/controllers, etc), Sidekiq will not see it unless you add the path to eagerloadpaths in your Rails application configuration.
We could remove the require
statements and add the following to our application.rb
:
1
2
3
4
config.eager_load_paths += %W(
#{config.root}/lib/jobs
#{config.root}/lib/sharethrough
)
Note, if you previously included these directories as part of your config.autoload_paths
, this is no longer necessary as Rails checks for eager loaded paths.
ActiveRecord Connection Pooling
We use MySQL as our database and have a max connection pool setting of 5. Here is a snippet from our database.yml
:
1
2
3
4
defaults: &defaults
adapter: mysql2
encoding: utf8
pool: 5
This means that for every Rails process (both web and worker), there will be a limit of 5 connections to the database at any given time. We decided to roll out Sidekiq with a concurrency of 5 threads, so each thread would be able to obtain a connection to the database(by default Sidekiq starts with 25 threads).
We noticed the following intermittent errors: ActiveRecord::ConnectionTimeoutError: could not obtain a database connection within 5.000 seconds (waited 5.000 seconds)
. This didn’t make any sense because theoretically each thread could have its own dedicated connection to the database. Taking a peek at the database, it appeared that Sidekiq was occasionally leaking connections. We decided mitigate this problem by wrapping any calls to the database in the thread-safe method ActiveRecord::Base.connection_pool.with_connection
. Any job that needs to connect to ActiveRecord
now inherits from:
1
2
3
4
5
6
7
8
9
10
11
12
module Jobs
class BaseActiveRecordJob
def perform(*args)
ActiveRecord::Base.connection_pool.with_connection do
perform_with_ar_connection(*args)
end
ensure
ActiveRecord::Base.clear_active_connections!
ActiveRecord::Base.connection.close
end
end
end
Our new job looks like:
1
2
3
4
5
6
7
8
9
module Jobs
class SharethroughJob < BaseActiveRecordJob
include Sidekiq::Worker
def perform_with_ar_connection(object_id)
#do work touching the db
end
end
end
Future Improvements
Now that all our jobs are moved over to Sidekiq and everything is stable, we would like to experiment with optimizing our use of Sidekiq. We are in the process of experimenting with increasing our concurrency. Additionally, we would like to run more Sidekiq processes on our worker boxes.
We are curious to assess Sidekiq’s Enterprise version, Sidekiq Enterprise which offers additional features, such as improved reliability, batch jobs, and even support.
We’re going to try and port over our jobs logic to use the new ActiveJob API, so that hopefully in the future swapping out message queue backends will be less painful.