ActiveJob::Continuable - Interrupting and resuming jobs in Ruby on Rails
In the world of modern web application development, processing long-running tasks in the background is a common practice. Ruby on Rails offers the ActiveJob
framework for this purpose, and recently introduced an extremely useful feature - ActiveJob::Continuable
. In this article, we will take a closer look at this feature, which allows you to interrupt and resume tasks, which is especially useful for long-running operations and application restarts.
What is ActiveJob::Continuable?
ActiveJob::Continuable
is a module that allows you to interrupt and resume tasks in ActiveJob. This mechanism is extremely useful for long-running operations that can be interrupted by a server restart or a planned system shutdown. With ActiveJob::Continuable
, tasks can save their progress and continue working from where they were interrupted, instead of starting all over again.
How does it work?
The concept is quite simple - tasks are divided into "steps" and in each step we can track the progress using "cursors". When the application is restarted or the task is interrupted for some other reason, after the restart it skips the steps that have already been completed and resumes execution from the last saved point.
Let's see this with an example:
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
def perform(import_id)
# This code always executes, even after job resumption
@import = Import.find(import_id)
step :validate do
@import.validate!
end
step :process_records do |step|
@import.records.find_each(start: step.cursor) do |record|
record.process
step.advance! from: record.id # increment step
end
end
step :finalize do
@import.finalize!
end
end
end
In the above example, we define three steps: validation, record processing, and finalization. If the task is interrupted while processing records, when resumed, it will automatically skip the validation step and resume processing records from the last saved ID.
Defining Steps
We can define steps in two ways: using blocks or by reference to a method. Let's look at both methods:
class ExampleJob < ApplicationJob
include ActiveJob::Continuable
def perform(id)
@object = MyModel.find(id)
# Method 1: with a block
step :first_step do
# step code
end
# Method 2: with a method reference
step :second_step
end
private
def second_step
# step code
end
end
Methods can take one argument (step object) or no arguments. Example with argument:
def process_data(step)
@object.items.find_each(start: step.cursor) do |item|
item.process
step.advance! from: item.id
end
end
Cursors - tracking progress
Cursors are the heart of the resume mechanism - they store progress information within a step. A cursor can be any object that can be serialized as an argument to ActiveJob::Base.serialize
. We have several ways to manipulate the cursor:
Setting a specific value
step :iterate_over_items do |step|
items[step.cursor..].each do |item|
process(item)
step.set! (step.cursor || 0) + 1
end
end
Defining a starting value
step :iterate_over_items, start: 0 do |step|
items[step.cursor..].each do |item|
process(item)
step.set! step.cursor + 1
end
end
Incrementing the cursor
step :iterate_over_items, start: 0 do |step|
items[step.cursor..].each do |item|
process(item)
step.advance! # calls succ on the current cursor value
end
end
Update cursor to a specific value
step :process_records do |step|
import.records.find_each(start: step.cursor) do |record|
record.process
step.advance! from: record.id # useful when IDs aren't sequential
end
end
Using complex cursors
We can also use complex cursors, such as arrays, to track progress in nested loops:
step :process_nested_records, start: [0, 0] do |step|
Account.find_each(start: step.cursor[0]) do |account|
account.records.find_each(start: step.cursor[1]) do |record|
record.process
step.set! [account.id, record.id + 1]
end
step.set! [account.id + 1, 0]
end
end
Checkpoints
A checkpoint is a place where a job can safely interrupt. At this point, the job checks if the queue adapter is in stopping mode (queue_adapter.stopping?
). If so, the job will throw an ActiveJob::Continuation::Interrupt
exception.
Checkpoints are created automatically:
- At the end of each step
- When calling the
set!
,advance!
orcheckpoint!
methods in a step
We can also create a checkpoint explicitly using the checkpoint!
method:
step :delete_records do |step|
import.records.find_each do |record|
record.destroy!
step.checkpoint!
end
end
Jobs are not automatically terminated when the queue adapter is marked as stopping - they will continue to the next checkpoint or until the process is stopped. Therefore, it is important to checkpoint frequently in long-running jobs.
Error handling
If a job throws an error and is not automatically retried by ActiveJob, the progress made in the current iteration is lost. To prevent this, the job is automatically retried if it throws an error after making progress (defined as completing a step or updating the cursor in the current step).
Queue adapter support
ActiveJob::Continuable
calls the stopping?
method on the queue adapter to check if we are in the stopping phase. By default, this method returns false
, so adapters must be updated to implement this method.
Currently, Test and Sidekiq adapters have built-in support. For other adapters, such as Delayed Job, Resque or Solid Queue, it is necessary to add appropriate mechanisms for handling stopping events.
Comparison with other solutions
ActiveJob::Continuable
draws inspiration from Shopify's job-iteration
gem, but introduces several important differences:
- Continuations are only available for ActiveJob, so they do not provide custom enumerators like job-iteration
- They allow multi-step flows
- They do not intercept the
perform
method - They are more low-level - they require manual creation of checkpoints and cursor updates
Practical example
Let's imagine a scenario of importing a large CSV file with several hundred thousand records. Using ActiveJob::Continuable
, we can implement it like this:
class ImportCsvJob < ApplicationJob
include ActiveJob::Continuable
def perform(file_path)
@file_path = file_path
@import = Import.create(status: "processing", file_path: file_path)
step :validate_file do
unless File.exist?(@file_path)
@import.update(status: "failed", error: "File does not exist")
return
end
end
step :load_headers do
headers = CSV.open(@file_path, &:readline)
@import.update(headers: headers)
end
step :process_rows, start: 0 do |step|
current_line = 0
CSV.foreach(@file_path, headers: true) do |row|
next if current_line < step.cursor
# Processing row
create_record_from_row(row)
# Updating cursor
current_line += 1
step.set!(current_line)
# optional - we can add logic to limit rows per iteration
if current_line % 1000 == 0
Rails.logger.info("Processed #{current_line} rows")
end
end
end
step :finalize do
@import.update(status: "completed")
Rails.logger.info("Import completed, all rows processed")
end
end
private
def create_record_from_row(row)
# Logic to create record from CSV row
ImportedRecord.create!(row.to_h)
rescue => e
ErrorLog.create(import_id: @import.id, row_data: row.to_h, error: e.message)
end
end
In the above example, if the server is restarted during the import, the job will resume from the last processed CSV row, instead of starting from the beginning.
Summary
ActiveJob::Continuable
is a powerful tool in the Ruby on Rails ecosystem that allows you to efficiently manage long-running jobs. Using the mechanism of steps and cursors, we can implement jobs that safely handle interruptions and application restarts.
Happy continuing!