Faktory Worker Testing: Job Deduplication and Failure Handling
Faktory is a language-agnostic background job server created by Mike Perham (the creator of Sidekiq). Unlike Sidekiq, which is Redis-based and Ruby-only, Faktory is a standalone server with client libraries for Ruby, Go, Python, JavaScript, and other languages. Testing Faktory workers shares concepts with other queue systems but has important differences in how the server handles deduplication, failures, and the dead job queue.
Faktory Architecture
Faktory is a server process (distinct from your application) that stores and routes jobs. Your application uses a language-specific client library to push and pull jobs. The worker polls Faktory for jobs, processes them, and acknowledges (ACK) or rejects (FAIL) each job.
Key concepts:
- ACK: Job completed successfully, remove from queue
- FAIL: Job failed, schedule for retry or move to dead queue
- Dead queue: Jobs that exhausted all retries
- Unique jobs: Server-side deduplication (Faktory Pro feature)
Testing Approaches
Option 1: Test Worker Logic Directly
The cleanest approach: separate job logic from Faktory integration.
# workers/email_notification_worker.rb
class EmailNotificationWorker
include Faktory::Job
faktory_options queue: 'default', retry: 5
def perform(user_id, template_name, context = {})
perform_notification(user_id, template_name, context)
end
private
# Testable method — no Faktory dependency
def perform_notification(user_id, template_name, context)
user = User.find(user_id)
template = EmailTemplate.find_by!(name: template_name)
rendered = template.render(context)
Mailer.send_email(
to: user.email,
subject: rendered.subject,
body: rendered.body
)
EmailLog.create!(
user_id: user_id,
template_name: template_name,
sent_at: Time.current
)
end
end# spec/workers/email_notification_worker_spec.rb
RSpec.describe EmailNotificationWorker do
subject(:worker) { described_class.new }
let(:user) { create(:user, email: 'alice@example.com') }
let(:template) { create(:email_template, name: 'welcome', subject: 'Welcome!') }
describe '#perform_notification (unit test)' do
it 'sends email to user' do
expect(Mailer).to receive(:send_email).with(
to: user.email,
subject: 'Welcome!',
body: anything
)
worker.send(:perform_notification, user.id, 'welcome', {})
end
it 'creates an email log entry' do
allow(Mailer).to receive(:send_email)
expect {
worker.send(:perform_notification, user.id, 'welcome', {})
}.to change(EmailLog, :count).by(1)
end
it 'raises for unknown template' do
expect {
worker.send(:perform_notification, user.id, 'nonexistent', {})
}.to raise_error(ActiveRecord::RecordNotFound)
end
it 'raises for unknown user' do
expect {
worker.send(:perform_notification, -1, 'welcome', {})
}.to raise_error(ActiveRecord::RecordNotFound)
end
end
endOption 2: Test with faktory_worker_ruby Test Helpers
# Gemfile
gem 'faktory_worker_ruby'
# In test configuration
require 'faktory/testing'
Faktory::Testing.fake! # Queue jobs but don't process themRSpec.describe 'Job enqueueing' do
before do
Faktory::Testing.fake!
Faktory::Job.jobs.clear
end
after do
Faktory::Testing.disable!
end
it 'enqueues a notification job when user signs up' do
expect {
UserRegistrationService.new.register(
email: 'alice@example.com',
name: 'Alice'
)
}.to change(EmailNotificationWorker.jobs, :size).by(1)
end
it 'enqueues with correct arguments' do
user = create(:user)
EmailNotificationWorker.perform_async(user.id, 'welcome')
job = EmailNotificationWorker.jobs.last
expect(job['args']).to eq([user.id, 'welcome'])
end
it 'enqueues to correct queue' do
user = create(:user)
EmailNotificationWorker.perform_async(user.id, 'critical_alert')
job = EmailNotificationWorker.jobs.last
expect(job['queue']).to eq('default')
end
endOption 3: Integration Testing Against Real Faktory
For integration tests, run Faktory locally (or in Docker):
# Start Faktory for testing
docker run -d -p 7419:7419 -p 7420:7420 contribsys/faktory:latest# spec/support/faktory_helpers.rb
module FaktoryHelpers
def wait_for_jobs(worker_class, timeout: 10)
deadline = Time.now + timeout
loop do
return if worker_class.jobs.empty?
raise "Timeout waiting for jobs" if Time.now > deadline
sleep 0.1
end
end
def run_faktory_worker(worker_class, &block)
Faktory::Testing.inline! do
block.call
end
end
end
RSpec.configure do |config|
config.include FaktoryHelpers
endTesting Job Deduplication
Faktory Pro includes server-side job deduplication. In the open-source version, you implement it client-side.
Client-Side Deduplication
module Deduplicatable
extend ActiveSupport::Concern
included do
# Deduplication key based on job class and arguments
def self.perform_unique(*args)
dedup_key = "job:#{name}:#{args.to_json}"
# Use Redis for deduplication tracking
acquired = $redis.set(
dedup_key,
'1',
nx: true, # Only set if not exists
ex: 3600 # 1 hour TTL
)
if acquired
perform_async(*args)
else
Rails.logger.info "Duplicate job skipped: #{dedup_key}"
nil
end
end
end
def perform(*)
# Remove dedup key when job completes (optional — TTL handles it)
$redis.del("job:#{self.class.name}:#{self.class.instance_variable_get(:@_args).to_json}")
super
end
endRSpec.describe 'Job deduplication' do
let(:user) { create(:user) }
before do
Faktory::Testing.fake!
$redis.flushdb
end
it 'enqueues only one job for duplicate calls' do
EmailNotificationWorker.perform_unique(user.id, 'welcome')
EmailNotificationWorker.perform_unique(user.id, 'welcome')
expect(EmailNotificationWorker.jobs.size).to eq(1)
end
it 'allows different jobs with different args' do
EmailNotificationWorker.perform_unique(user.id, 'welcome')
EmailNotificationWorker.perform_unique(user.id, 'invoice')
expect(EmailNotificationWorker.jobs.size).to eq(2)
end
it 'allows re-enqueue after TTL expires' do
EmailNotificationWorker.perform_unique(user.id, 'welcome')
# Expire the dedup key
$redis.del("job:EmailNotificationWorker:[#{user.id},\"welcome\"]")
EmailNotificationWorker.perform_unique(user.id, 'welcome')
expect(EmailNotificationWorker.jobs.size).to eq(2)
end
it 'allows re-enqueue after job completes' do
# First job
Faktory::Testing.inline! do
EmailNotificationWorker.perform_unique(user.id, 'welcome')
end
# After completion, can enqueue again
EmailNotificationWorker.perform_unique(user.id, 'welcome')
expect(EmailNotificationWorker.jobs.size).to eq(1)
end
endTesting Failure Handling
Retry Configuration
class DataSyncWorker
include Faktory::Job
faktory_options(
queue: 'sync',
retry: 5,
# Custom retry schedule: 1min, 5min, 30min, 2hr, 8hr
retry_count: 5,
custom: {
'backtrace' => 10 # Capture 10 lines of backtrace on failure
}
)
def perform(entity_type, entity_id)
SyncService.sync(entity_type, entity_id)
rescue SyncService::ServiceUnavailable => e
# Retry with custom delay
retry_job delay: 60
rescue SyncService::InvalidDataError => e
# Don't retry — log and give up
SyncErrorLog.record(entity_type: entity_type, entity_id: entity_id, error: e.message)
# Don't raise — job is done (as a failure)
end
endRSpec.describe DataSyncWorker do
subject(:worker) { described_class.new }
let(:entity_id) { 42 }
describe 'failure handling' do
context 'when service is unavailable' do
before do
allow(SyncService).to receive(:sync)
.and_raise(SyncService::ServiceUnavailable, "503 Service Unavailable")
end
it 'enqueues retry' do
expect(worker).to receive(:retry_job).with(delay: 60)
worker.perform('order', entity_id)
end
end
context 'when data is invalid' do
before do
allow(SyncService).to receive(:sync)
.and_raise(SyncService::InvalidDataError, "Missing required field")
end
it 'logs the error and does not raise' do
expect { worker.perform('order', entity_id) }.not_to raise_error
expect(SyncErrorLog.last).to have_attributes(
entity_type: 'order',
entity_id: entity_id,
error: 'Missing required field'
)
end
end
context 'when unexpected error occurs' do
before do
allow(SyncService).to receive(:sync)
.and_raise(StandardError, "Unexpected failure")
end
it 'raises to trigger Faktory retry' do
expect { worker.perform('order', entity_id) }
.to raise_error(StandardError, "Unexpected failure")
end
end
end
endTesting the Dead Job Queue
After exhausting retries, Faktory moves jobs to the dead queue. Testing this requires inspecting the Faktory server:
# spec/workers/dead_queue_spec.rb
require 'faktory'
RSpec.describe 'Dead job queue behavior', :faktory_integration do
# This requires a running Faktory server
let(:client) { Faktory.server }
before do
# Clear dead queue before each test
client.call('QUEUE', 'PAUSE', 'dead')
# Note: clearing the dead queue requires Faktory Pro or direct Redis access
end
it 'records job in dead queue after exhausted retries', :slow do
# Create a job that will fail
class AlwaysFailsWorker
include Faktory::Job
faktory_options retry: 0 # No retries — go straight to dead queue
def perform
raise "Always fails"
end
end
# Push the job
AlwaysFailsWorker.perform_async
# Run a worker briefly to process it
run_worker_for(2.seconds)
# Check dead queue (requires Faktory Pro API or monitoring)
dead_count = check_dead_queue_size
expect(dead_count).to be >= 1
end
endTesting Batch Job Completion
# For processing large numbers of records
class BulkProcessorWorker
include Faktory::Job
def self.enqueue_batch(record_ids)
record_ids.each do |id|
perform_async(id)
end
end
def perform(record_id)
Record.find(record_id).process!
end
end
RSpec.describe BulkProcessorWorker do
before { Faktory::Testing.fake! }
it 'enqueues one job per record' do
records = create_list(:record, 5)
described_class.enqueue_batch(records.map(&:id))
expect(described_class.jobs.size).to eq(5)
expect(described_class.jobs.map { |j| j['args'].first })
.to match_array(records.map(&:id))
end
context 'in inline mode' do
around { |ex| Faktory::Testing.inline! { ex.run } }
it 'processes all records' do
records = create_list(:record, 3, status: 'pending')
described_class.enqueue_batch(records.map(&:id))
records.each(&:reload)
expect(records.all? { |r| r.status == 'processed' }).to be true
end
end
endGo Client Testing
Faktory's Go client is well-suited for testing since Go's interface system enables easy mocking:
// worker/processor.go
package worker
import (
"context"
"encoding/json"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
)
type EmailProcessor struct {
mailer MailService
users UserRepository
logger Logger
}
// Pure function — testable without Faktory
func (p *EmailProcessor) ProcessEmailJob(ctx context.Context, args ...interface{}) error {
if len(args) < 2 {
return fmt.Errorf("expected at least 2 args, got %d", len(args))
}
userID := int64(args[0].(float64))
templateName := args[1].(string)
user, err := p.users.FindByID(ctx, userID)
if err != nil {
return fmt.Errorf("user not found: %w", err)
}
return p.mailer.Send(ctx, MailMessage{
To: user.Email,
Template: templateName,
})
}// worker/processor_test.go
package worker_test
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
type MockMailService struct {
mock.Mock
}
func (m *MockMailService) Send(ctx context.Context, msg MailMessage) error {
args := m.Called(ctx, msg)
return args.Error(0)
}
type MockUserRepo struct {
mock.Mock
}
func (m *MockUserRepo) FindByID(ctx context.Context, id int64) (*User, error) {
args := m.Called(ctx, id)
return args.Get(0).(*User), args.Error(1)
}
func TestEmailProcessor_ProcessEmailJob(t *testing.T) {
tests := []struct {
name string
args []interface{}
setupMocks func(*MockMailService, *MockUserRepo)
expectError bool
errorMsg string
}{
{
name: "sends email successfully",
args: []interface{}{float64(1), "welcome"},
setupMocks: func(mailer *MockMailService, users *MockUserRepo) {
users.On("FindByID", mock.Anything, int64(1)).
Return(&User{ID: 1, Email: "alice@example.com"}, nil)
mailer.On("Send", mock.Anything, MailMessage{
To: "alice@example.com",
Template: "welcome",
}).Return(nil)
},
},
{
name: "fails with missing args",
args: []interface{}{float64(1)},
expectError: true,
errorMsg: "expected at least 2 args",
},
{
name: "fails when user not found",
args: []interface{}{float64(999), "welcome"},
setupMocks: func(mailer *MockMailService, users *MockUserRepo) {
users.On("FindByID", mock.Anything, int64(999)).
Return((*User)(nil), errors.New("user not found"))
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mailer := &MockMailService{}
users := &MockUserRepo{}
if tt.setupMocks != nil {
tt.setupMocks(mailer, users)
}
processor := &EmailProcessor{
mailer: mailer,
users: users,
}
err := processor.ProcessEmailJob(context.Background(), tt.args...)
if tt.expectError {
assert.Error(t, err)
if tt.errorMsg != "" {
assert.Contains(t, err.Error(), tt.errorMsg)
}
} else {
assert.NoError(t, err)
}
mailer.AssertExpectations(t)
users.AssertExpectations(t)
})
}
}Faktory's language-agnostic nature means testing approaches vary by client. The consistent principle across all languages: extract the job's business logic into pure functions, test those separately from the Faktory client code, and reserve integration tests for the enqueueing/retry/DLQ behaviors that require real server interaction.