ActiveJob::Continuable - Interrupting and resuming jobs in Ruby on Rails

In the world of modern web application development, processing long-running tasks in the background is a common practice. Ruby on Rails offers the ActiveJob framework for this purpose, and recently introduced an extremely useful feature - ActiveJob::Continuable. In this article, we will take a closer look at this feature, which allows you to interrupt and resume tasks, which is especially useful for long-running operations and application restarts.

What is ActiveJob::Continuable?

ActiveJob::Continuable is a module that allows you to interrupt and resume tasks in ActiveJob. This mechanism is extremely useful for long-running operations that can be interrupted by a server restart or a planned system shutdown. With ActiveJob::Continuable, tasks can save their progress and continue working from where they were interrupted, instead of starting all over again.

How does it work?

The concept is quite simple - tasks are divided into "steps" and in each step we can track the progress using "cursors". When the application is restarted or the task is interrupted for some other reason, after the restart it skips the steps that have already been completed and resumes execution from the last saved point.

Let's see this with an example:

class ProcessImportJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(import_id)
    # This code always executes, even after job resumption
    @import = Import.find(import_id)

    step :validate do
      @import.validate!
    end

    step :process_records do |step|
      @import.records.find_each(start: step.cursor) do |record|
        record.process
        step.advance! from: record.id # increment step
      end
    end

    step :finalize do
      @import.finalize!
    end
  end
end

In the above example, we define three steps: validation, record processing, and finalization. If the task is interrupted while processing records, when resumed, it will automatically skip the validation step and resume processing records from the last saved ID.

Defining Steps

We can define steps in two ways: using blocks or by reference to a method. Let's look at both methods:

class ExampleJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(id)
    @object = MyModel.find(id)

    # Method 1: with a block
    step :first_step do
      # step code
    end

    # Method 2: with a method reference
    step :second_step
  end

  private

  def second_step
    # step code
  end
end

Methods can take one argument (step object) or no arguments. Example with argument:

def process_data(step)
  @object.items.find_each(start: step.cursor) do |item|
    item.process
    step.advance! from: item.id
  end
end

Cursors - tracking progress

Cursors are the heart of the resume mechanism - they store progress information within a step. A cursor can be any object that can be serialized as an argument to ActiveJob::Base.serialize. We have several ways to manipulate the cursor:

Setting a specific value

step :iterate_over_items do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! (step.cursor || 0) + 1
  end
end

Defining a starting value

step :iterate_over_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.set! step.cursor + 1
  end
end

Incrementing the cursor

step :iterate_over_items, start: 0 do |step|
  items[step.cursor..].each do |item|
    process(item)
    step.advance!  # calls succ on the current cursor value
  end
end

Update cursor to a specific value

step :process_records do |step|
  import.records.find_each(start: step.cursor) do |record|
    record.process
    step.advance! from: record.id  # useful when IDs aren't sequential
  end
end

Using complex cursors

We can also use complex cursors, such as arrays, to track progress in nested loops:

step :process_nested_records, start: [0, 0] do |step|
  Account.find_each(start: step.cursor[0]) do |account|
    account.records.find_each(start: step.cursor[1]) do |record|
      record.process
      step.set! [account.id, record.id + 1]
    end
    step.set! [account.id + 1, 0]
  end
end

Checkpoints

A checkpoint is a place where a job can safely interrupt. At this point, the job checks if the queue adapter is in stopping mode (queue_adapter.stopping?). If so, the job will throw an ActiveJob::Continuation::Interrupt exception.

Checkpoints are created automatically:

  • At the end of each step
  • When calling the set!advance! or checkpoint! methods in a step

We can also create a checkpoint explicitly using the checkpoint! method:

step :delete_records do |step|
  import.records.find_each do |record|
    record.destroy!
    step.checkpoint!
  end
end

Jobs are not automatically terminated when the queue adapter is marked as stopping - they will continue to the next checkpoint or until the process is stopped. Therefore, it is important to checkpoint frequently in long-running jobs.

Error handling

If a job throws an error and is not automatically retried by ActiveJob, the progress made in the current iteration is lost. To prevent this, the job is automatically retried if it throws an error after making progress (defined as completing a step or updating the cursor in the current step).

Queue adapter support

ActiveJob::Continuable calls the stopping? method on the queue adapter to check if we are in the stopping phase. By default, this method returns false, so adapters must be updated to implement this method.

Currently, Test and Sidekiq adapters have built-in support. For other adapters, such as Delayed Job, Resque or Solid Queue, it is necessary to add appropriate mechanisms for handling stopping events.

Comparison with other solutions

ActiveJob::Continuable draws inspiration from Shopify's job-iteration gem, but introduces several important differences:

  1. Continuations are only available for ActiveJob, so they do not provide custom enumerators like job-iteration
  2. They allow multi-step flows
  3. They do not intercept the perform method
  4. They are more low-level - they require manual creation of checkpoints and cursor updates

Practical example

Let's imagine a scenario of importing a large CSV file with several hundred thousand records. Using ActiveJob::Continuable, we can implement it like this:

class ImportCsvJob < ApplicationJob
  include ActiveJob::Continuable

  def perform(file_path)
    @file_path = file_path
    @import = Import.create(status: "processing", file_path: file_path)

    step :validate_file do
      unless File.exist?(@file_path)
        @import.update(status: "failed", error: "File does not exist")
        return
      end
    end

    step :load_headers do
      headers = CSV.open(@file_path, &:readline)
      @import.update(headers: headers)
    end

    step :process_rows, start: 0 do |step|
      current_line = 0

      CSV.foreach(@file_path, headers: true) do |row|
        next if current_line < step.cursor

        # Processing row
        create_record_from_row(row)

        # Updating cursor
        current_line += 1
        step.set!(current_line)

        # optional - we can add logic to limit rows per iteration
        if current_line % 1000 == 0
          Rails.logger.info("Processed #{current_line} rows")
        end
      end
    end

    step :finalize do
      @import.update(status: "completed")
      Rails.logger.info("Import completed, all rows processed")
    end
  end

  private

  def create_record_from_row(row)
    # Logic to create record from CSV row
    ImportedRecord.create!(row.to_h)
  rescue => e
    ErrorLog.create(import_id: @import.id, row_data: row.to_h, error: e.message)
  end
end

In the above example, if the server is restarted during the import, the job will resume from the last processed CSV row, instead of starting from the beginning.

Summary

ActiveJob::Continuable is a powerful tool in the Ruby on Rails ecosystem that allows you to efficiently manage long-running jobs. Using the mechanism of steps and cursors, we can implement jobs that safely handle interruptions and application restarts.

Happy continuing!