Faktory Worker Testing: Job Deduplication and Failure Handling

Faktory Worker Testing: Job Deduplication and Failure Handling

Faktory is a language-agnostic background job server created by Mike Perham (the creator of Sidekiq). Unlike Sidekiq, which is Redis-based and Ruby-only, Faktory is a standalone server with client libraries for Ruby, Go, Python, JavaScript, and other languages. Testing Faktory workers shares concepts with other queue systems but has important differences in how the server handles deduplication, failures, and the dead job queue.

Faktory Architecture

Faktory is a server process (distinct from your application) that stores and routes jobs. Your application uses a language-specific client library to push and pull jobs. The worker polls Faktory for jobs, processes them, and acknowledges (ACK) or rejects (FAIL) each job.

Key concepts:

  • ACK: Job completed successfully, remove from queue
  • FAIL: Job failed, schedule for retry or move to dead queue
  • Dead queue: Jobs that exhausted all retries
  • Unique jobs: Server-side deduplication (Faktory Pro feature)

Testing Approaches

Option 1: Test Worker Logic Directly

The cleanest approach: separate job logic from Faktory integration.

# workers/email_notification_worker.rb
class EmailNotificationWorker
  include Faktory::Job
  
  faktory_options queue: 'default', retry: 5
  
  def perform(user_id, template_name, context = {})
    perform_notification(user_id, template_name, context)
  end
  
  private
  
  # Testable method — no Faktory dependency
  def perform_notification(user_id, template_name, context)
    user = User.find(user_id)
    
    template = EmailTemplate.find_by!(name: template_name)
    rendered = template.render(context)
    
    Mailer.send_email(
      to: user.email,
      subject: rendered.subject,
      body: rendered.body
    )
    
    EmailLog.create!(
      user_id: user_id,
      template_name: template_name,
      sent_at: Time.current
    )
  end
end
# spec/workers/email_notification_worker_spec.rb
RSpec.describe EmailNotificationWorker do
  subject(:worker) { described_class.new }
  
  let(:user) { create(:user, email: 'alice@example.com') }
  let(:template) { create(:email_template, name: 'welcome', subject: 'Welcome!') }
  
  describe '#perform_notification (unit test)' do
    it 'sends email to user' do
      expect(Mailer).to receive(:send_email).with(
        to: user.email,
        subject: 'Welcome!',
        body: anything
      )
      
      worker.send(:perform_notification, user.id, 'welcome', {})
    end
    
    it 'creates an email log entry' do
      allow(Mailer).to receive(:send_email)
      
      expect {
        worker.send(:perform_notification, user.id, 'welcome', {})
      }.to change(EmailLog, :count).by(1)
    end
    
    it 'raises for unknown template' do
      expect {
        worker.send(:perform_notification, user.id, 'nonexistent', {})
      }.to raise_error(ActiveRecord::RecordNotFound)
    end
    
    it 'raises for unknown user' do
      expect {
        worker.send(:perform_notification, -1, 'welcome', {})
      }.to raise_error(ActiveRecord::RecordNotFound)
    end
  end
end

Option 2: Test with faktory_worker_ruby Test Helpers

# Gemfile
gem 'faktory_worker_ruby'

# In test configuration
require 'faktory/testing'
Faktory::Testing.fake!  # Queue jobs but don't process them
RSpec.describe 'Job enqueueing' do
  before do
    Faktory::Testing.fake!
    Faktory::Job.jobs.clear
  end
  
  after do
    Faktory::Testing.disable!
  end
  
  it 'enqueues a notification job when user signs up' do
    expect {
      UserRegistrationService.new.register(
        email: 'alice@example.com',
        name: 'Alice'
      )
    }.to change(EmailNotificationWorker.jobs, :size).by(1)
  end
  
  it 'enqueues with correct arguments' do
    user = create(:user)
    EmailNotificationWorker.perform_async(user.id, 'welcome')
    
    job = EmailNotificationWorker.jobs.last
    expect(job['args']).to eq([user.id, 'welcome'])
  end
  
  it 'enqueues to correct queue' do
    user = create(:user)
    EmailNotificationWorker.perform_async(user.id, 'critical_alert')
    
    job = EmailNotificationWorker.jobs.last
    expect(job['queue']).to eq('default')
  end
end

Option 3: Integration Testing Against Real Faktory

For integration tests, run Faktory locally (or in Docker):

# Start Faktory for testing
docker run -d -p 7419:7419 -p 7420:7420 contribsys/faktory:latest
# spec/support/faktory_helpers.rb
module FaktoryHelpers
  def wait_for_jobs(worker_class, timeout: 10)
    deadline = Time.now + timeout
    
    loop do
      return if worker_class.jobs.empty?
      raise "Timeout waiting for jobs" if Time.now > deadline
      sleep 0.1
    end
  end
  
  def run_faktory_worker(worker_class, &block)
    Faktory::Testing.inline! do
      block.call
    end
  end
end

RSpec.configure do |config|
  config.include FaktoryHelpers
end

Testing Job Deduplication

Faktory Pro includes server-side job deduplication. In the open-source version, you implement it client-side.

Client-Side Deduplication

module Deduplicatable
  extend ActiveSupport::Concern
  
  included do
    # Deduplication key based on job class and arguments
    def self.perform_unique(*args)
      dedup_key = "job:#{name}:#{args.to_json}"
      
      # Use Redis for deduplication tracking
      acquired = $redis.set(
        dedup_key,
        '1',
        nx: true,         # Only set if not exists
        ex: 3600          # 1 hour TTL
      )
      
      if acquired
        perform_async(*args)
      else
        Rails.logger.info "Duplicate job skipped: #{dedup_key}"
        nil
      end
    end
  end
  
  def perform(*)
    # Remove dedup key when job completes (optional — TTL handles it)
    $redis.del("job:#{self.class.name}:#{self.class.instance_variable_get(:@_args).to_json}")
    super
  end
end
RSpec.describe 'Job deduplication' do
  let(:user) { create(:user) }
  
  before do
    Faktory::Testing.fake!
    $redis.flushdb
  end
  
  it 'enqueues only one job for duplicate calls' do
    EmailNotificationWorker.perform_unique(user.id, 'welcome')
    EmailNotificationWorker.perform_unique(user.id, 'welcome')
    
    expect(EmailNotificationWorker.jobs.size).to eq(1)
  end
  
  it 'allows different jobs with different args' do
    EmailNotificationWorker.perform_unique(user.id, 'welcome')
    EmailNotificationWorker.perform_unique(user.id, 'invoice')
    
    expect(EmailNotificationWorker.jobs.size).to eq(2)
  end
  
  it 'allows re-enqueue after TTL expires' do
    EmailNotificationWorker.perform_unique(user.id, 'welcome')
    
    # Expire the dedup key
    $redis.del("job:EmailNotificationWorker:[#{user.id},\"welcome\"]")
    
    EmailNotificationWorker.perform_unique(user.id, 'welcome')
    
    expect(EmailNotificationWorker.jobs.size).to eq(2)
  end
  
  it 'allows re-enqueue after job completes' do
    # First job
    Faktory::Testing.inline! do
      EmailNotificationWorker.perform_unique(user.id, 'welcome')
    end
    
    # After completion, can enqueue again
    EmailNotificationWorker.perform_unique(user.id, 'welcome')
    
    expect(EmailNotificationWorker.jobs.size).to eq(1)
  end
end

Testing Failure Handling

Retry Configuration

class DataSyncWorker
  include Faktory::Job
  
  faktory_options(
    queue: 'sync',
    retry: 5,
    # Custom retry schedule: 1min, 5min, 30min, 2hr, 8hr
    retry_count: 5,
    custom: {
      'backtrace' => 10  # Capture 10 lines of backtrace on failure
    }
  )
  
  def perform(entity_type, entity_id)
    SyncService.sync(entity_type, entity_id)
  rescue SyncService::ServiceUnavailable => e
    # Retry with custom delay
    retry_job delay: 60
  rescue SyncService::InvalidDataError => e
    # Don't retry — log and give up
    SyncErrorLog.record(entity_type: entity_type, entity_id: entity_id, error: e.message)
    # Don't raise — job is done (as a failure)
  end
end
RSpec.describe DataSyncWorker do
  subject(:worker) { described_class.new }
  
  let(:entity_id) { 42 }
  
  describe 'failure handling' do
    context 'when service is unavailable' do
      before do
        allow(SyncService).to receive(:sync)
          .and_raise(SyncService::ServiceUnavailable, "503 Service Unavailable")
      end
      
      it 'enqueues retry' do
        expect(worker).to receive(:retry_job).with(delay: 60)
        worker.perform('order', entity_id)
      end
    end
    
    context 'when data is invalid' do
      before do
        allow(SyncService).to receive(:sync)
          .and_raise(SyncService::InvalidDataError, "Missing required field")
      end
      
      it 'logs the error and does not raise' do
        expect { worker.perform('order', entity_id) }.not_to raise_error
        
        expect(SyncErrorLog.last).to have_attributes(
          entity_type: 'order',
          entity_id: entity_id,
          error: 'Missing required field'
        )
      end
    end
    
    context 'when unexpected error occurs' do
      before do
        allow(SyncService).to receive(:sync)
          .and_raise(StandardError, "Unexpected failure")
      end
      
      it 'raises to trigger Faktory retry' do
        expect { worker.perform('order', entity_id) }
          .to raise_error(StandardError, "Unexpected failure")
      end
    end
  end
end

Testing the Dead Job Queue

After exhausting retries, Faktory moves jobs to the dead queue. Testing this requires inspecting the Faktory server:

# spec/workers/dead_queue_spec.rb
require 'faktory'

RSpec.describe 'Dead job queue behavior', :faktory_integration do
  
  # This requires a running Faktory server
  let(:client) { Faktory.server }
  
  before do
    # Clear dead queue before each test
    client.call('QUEUE', 'PAUSE', 'dead')
    # Note: clearing the dead queue requires Faktory Pro or direct Redis access
  end
  
  it 'records job in dead queue after exhausted retries', :slow do
    # Create a job that will fail
    class AlwaysFailsWorker
      include Faktory::Job
      faktory_options retry: 0  # No retries — go straight to dead queue
      
      def perform
        raise "Always fails"
      end
    end
    
    # Push the job
    AlwaysFailsWorker.perform_async
    
    # Run a worker briefly to process it
    run_worker_for(2.seconds)
    
    # Check dead queue (requires Faktory Pro API or monitoring)
    dead_count = check_dead_queue_size
    expect(dead_count).to be >= 1
  end
end

Testing Batch Job Completion

# For processing large numbers of records
class BulkProcessorWorker
  include Faktory::Job
  
  def self.enqueue_batch(record_ids)
    record_ids.each do |id|
      perform_async(id)
    end
  end
  
  def perform(record_id)
    Record.find(record_id).process!
  end
end

RSpec.describe BulkProcessorWorker do
  before { Faktory::Testing.fake! }
  
  it 'enqueues one job per record' do
    records = create_list(:record, 5)
    
    described_class.enqueue_batch(records.map(&:id))
    
    expect(described_class.jobs.size).to eq(5)
    expect(described_class.jobs.map { |j| j['args'].first })
      .to match_array(records.map(&:id))
  end
  
  context 'in inline mode' do
    around { |ex| Faktory::Testing.inline! { ex.run } }
    
    it 'processes all records' do
      records = create_list(:record, 3, status: 'pending')
      
      described_class.enqueue_batch(records.map(&:id))
      
      records.each(&:reload)
      expect(records.all? { |r| r.status == 'processed' }).to be true
    end
  end
end

Go Client Testing

Faktory's Go client is well-suited for testing since Go's interface system enables easy mocking:

// worker/processor.go
package worker

import (
    "context"
    "encoding/json"
    
    faktory "github.com/contribsys/faktory/client"
    worker "github.com/contribsys/faktory_worker_go"
)

type EmailProcessor struct {
    mailer  MailService
    users   UserRepository
    logger  Logger
}

// Pure function — testable without Faktory
func (p *EmailProcessor) ProcessEmailJob(ctx context.Context, args ...interface{}) error {
    if len(args) < 2 {
        return fmt.Errorf("expected at least 2 args, got %d", len(args))
    }
    
    userID := int64(args[0].(float64))
    templateName := args[1].(string)
    
    user, err := p.users.FindByID(ctx, userID)
    if err != nil {
        return fmt.Errorf("user not found: %w", err)
    }
    
    return p.mailer.Send(ctx, MailMessage{
        To:       user.Email,
        Template: templateName,
    })
}
// worker/processor_test.go
package worker_test

import (
    "context"
    "testing"
    
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/mock"
)

type MockMailService struct {
    mock.Mock
}

func (m *MockMailService) Send(ctx context.Context, msg MailMessage) error {
    args := m.Called(ctx, msg)
    return args.Error(0)
}

type MockUserRepo struct {
    mock.Mock
}

func (m *MockUserRepo) FindByID(ctx context.Context, id int64) (*User, error) {
    args := m.Called(ctx, id)
    return args.Get(0).(*User), args.Error(1)
}

func TestEmailProcessor_ProcessEmailJob(t *testing.T) {
    tests := []struct {
        name        string
        args        []interface{}
        setupMocks  func(*MockMailService, *MockUserRepo)
        expectError bool
        errorMsg    string
    }{
        {
            name: "sends email successfully",
            args: []interface{}{float64(1), "welcome"},
            setupMocks: func(mailer *MockMailService, users *MockUserRepo) {
                users.On("FindByID", mock.Anything, int64(1)).
                    Return(&User{ID: 1, Email: "alice@example.com"}, nil)
                mailer.On("Send", mock.Anything, MailMessage{
                    To:       "alice@example.com",
                    Template: "welcome",
                }).Return(nil)
            },
        },
        {
            name:        "fails with missing args",
            args:        []interface{}{float64(1)},
            expectError: true,
            errorMsg:    "expected at least 2 args",
        },
        {
            name: "fails when user not found",
            args: []interface{}{float64(999), "welcome"},
            setupMocks: func(mailer *MockMailService, users *MockUserRepo) {
                users.On("FindByID", mock.Anything, int64(999)).
                    Return((*User)(nil), errors.New("user not found"))
            },
            expectError: true,
        },
    }
    
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            mailer := &MockMailService{}
            users := &MockUserRepo{}
            
            if tt.setupMocks != nil {
                tt.setupMocks(mailer, users)
            }
            
            processor := &EmailProcessor{
                mailer: mailer,
                users:  users,
            }
            
            err := processor.ProcessEmailJob(context.Background(), tt.args...)
            
            if tt.expectError {
                assert.Error(t, err)
                if tt.errorMsg != "" {
                    assert.Contains(t, err.Error(), tt.errorMsg)
                }
            } else {
                assert.NoError(t, err)
            }
            
            mailer.AssertExpectations(t)
            users.AssertExpectations(t)
        })
    }
}

Faktory's language-agnostic nature means testing approaches vary by client. The consistent principle across all languages: extract the job's business logic into pure functions, test those separately from the Faktory client code, and reserve integration tests for the enqueueing/retry/DLQ behaviors that require real server interaction.

Read more