A simple, easy-to-understand task queue and background job processing package using Redis. Built on top of hibiken/asynq with a clean abstraction layer that allows you to switch implementations without changing your code.
- What is a Queue?
- Why Use a Queue?
- Core Concepts
- Features
- Installation
- Quick Start
- Package Structure
- API Reference
- Configuration
- Task Options
- Task States
- Middleware
- Complete Examples
- Real-World Use Cases
- Best Practices
- Running the Example
- Troubleshooting
A queue is like a line at a bank. When you have many tasks to do but can't do them all at once, you put them in a queue. The tasks will be processed one by one (or in parallel).
Simple Analogy:
- Queue = Laundry basket
- Worker = Washing machine
- Task = Dirty clothes
- Handler = Wash program (normal wash, quick wash, etc.)
- Redis = Storage for the basket
- Middleware = Extra features (fabric softener, fragrance, etc.)
Imagine you have a website that needs to:
- Send emails to 1000 people
- Process large images
- Generate time-consuming reports
If you do everything immediately, your website becomes slow and users have to wait a long time.
With a queue:
- Website immediately responds to user: "OK, will be processed"
- Heavy work is done in the background
- Users don't have to wait
Real-world benefits:
- Faster response times - Users don't wait for heavy tasks
- Better reliability - Failed tasks are automatically retried
- Scalability - Process multiple tasks in parallel
- Scheduling - Run tasks at specific times
- Priority handling - Important tasks processed first
Where you put tasks to be processed later.
Analogy: Like putting dirty clothes in a laundry basket.
Processes tasks from the queue.
Analogy: Like a washing machine that takes clothes from the basket and washes them.
A job that needs to be done.
Analogy: One set of clothes to be washed.
Code that explains how to process a task.
Analogy: The wash program in the washing machine (normal, quick, delicate, etc.).
- Abstracted Interface - Not dependent on any specific queue library
- Queue & Worker - Producer and consumer pattern
- Task Scheduling - Schedule tasks to be processed later
- Priority Queues - Multiple queues with different priorities
- Retry Mechanism - Automatic retry for failed tasks
- Unique Tasks - Prevent duplicate tasks
- Timeout & Deadline - Control task execution time
- Graceful Shutdown - Safe worker shutdown
- Type-Safe - Full Go type safety
- Middleware Support - Extensible with middleware functions
go get github.com/fatkulnurk/foundation/queueDependencies:
- Redis server (for storing the queue)
- Go 1.25 or higher
# Using Docker (easiest)
docker run -d -p 6379:6379 redis
# Or using Homebrew (Mac)
brew install redis
brew services start redis
# Or using apt (Linux)
sudo apt install redis-server
sudo systemctl start redispackage main
import (
"context"
"github.com/fatkulnurk/foundation/queue"
"github.com/redis/go-redis/v9"
)
func main() {
// Connect to Redis
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Create queue
q, _ := queue.NewQueue(redisClient)
// Enqueue a task
q.Enqueue(context.Background(), "email:send", map[string]string{
"to": "user@example.com",
"subject": "Welcome!",
"body": "Welcome to our service!",
})
}package main
import (
"context"
"encoding/json"
"github.com/fatkulnurk/foundation/queue"
"github.com/redis/go-redis/v9"
)
func main() {
// Connect to Redis
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// Create worker
cfg := &queue.Config{
Concurrency: 10,
}
w := queue.NewWorker(cfg, redisClient)
// Register handler
w.Register("email:send", func(ctx context.Context, payload []byte) error {
var email map[string]string
json.Unmarshal(payload, &email)
// Send email here
println("Sending email to:", email["to"])
return nil
})
// Start worker
w.Start()
}queue/
├── queue.go # Interfaces & contracts
├── asynq.go # Implementation using asynq
├── config.go # Worker configuration
├── middleware.go # Additional features for handlers
├── example/ # Usage examples
│ ├── main.go # Example code
│ └── README.md # How to run examples
└── go.mod # Dependencies
Contains interfaces (contracts/rules) for the queue package.
Analogy: Like a manual book explaining what can be done.
What's inside:
Queueinterface: How to enqueue tasksWorkerinterface: How to process tasksHandler: Function that processes tasksOption: Additional settings for tasks
Concrete implementation using the asynq library.
Analogy: If queue.go is the blueprint, then asynq.go is the actual working washing machine.
What's inside:
AsynqQueue: Queue implementation using RedisAsynqWorker: Worker implementation using Redis
Settings for the worker.
Analogy: Like washing machine settings (spin speed, temperature, etc.).
What's inside:
Concurrency: How many tasks can be processed simultaneouslyQueues: Queues with different prioritiesStrictPriority: Whether priority should be strictShutdownTimeout: How long to wait before forcing shutdown
Functions that wrap handlers to add features.
Analogy: Like extra features in a washing machine (fragrance, fabric softener, anti-wrinkle).
What's inside:
LoggingMiddleware: Records logsRecoveryMiddleware: Catches errors/panicsRetryLoggingMiddleware: Records retry attemptsTimeoutMiddleware: Limits execution timeMetricsMiddleware: Records statisticsChainMiddleware: Combines multiple middleware
type Queue interface {
// Enqueue adds a task to the queue
Enqueue(ctx context.Context, taskName string, payload any, opts ...Option) (*OutputEnqueue, error)
// GetTaskInfo retrieves information about a task by its ID
GetTaskInfo(ctx context.Context, taskID string) (*TaskInfo, error)
// Close closes the queue client connection
Close() error
}Usage:
// Create queue
q, err := queue.NewQueue(redisClient)
// Enqueue task
result, err := q.Enqueue(ctx, "email:send", emailData)
// Get task info
taskInfo, err := q.GetTaskInfo(ctx, result.TaskID)
// Close connection
q.Close()type Worker interface {
// Start starts the worker and begins processing tasks
Start() error
// Stop stops the worker gracefully
Stop()
// Register registers a handler for a specific task type
Register(taskType string, handler Handler) error
// RegisterWithMiddleware registers a handler with middleware functions
RegisterWithMiddleware(taskType string, handler Handler, middleware ...MiddlewareFunc) error
// GetTaskID retrieves the task ID from the context
GetTaskID(ctx context.Context) (string, bool)
// GetTaskInfo retrieves information about a task by its ID
GetTaskInfo(ctx context.Context, taskID string) (*TaskInfo, error)
}Usage:
// Create worker
w := queue.NewWorker(cfg, redisClient)
// Register handler
w.Register("email:send", handlerFunc)
// Register with middleware
w.RegisterWithMiddleware("email:send", handlerFunc,
queue.LoggingMiddleware("email:send"),
queue.RecoveryMiddleware("email:send"),
)
// Start worker
w.Start()
// Stop worker
w.Stop()type Handler func(ctx context.Context, payload []byte) errorA function that processes a task. Receives payload (data), returns error if failed.
Example:
func emailHandler(ctx context.Context, payload []byte) error {
var email EmailData
if err := json.Unmarshal(payload, &email); err != nil {
return err
}
// Send email
return sendEmail(email)
}type TaskInfo struct {
ID string // Unique task ID
Type string // Task type (email:send, etc.)
Payload []byte // Task data
State TaskState // Task status
Queue string // Queue name
MaxRetry int // Maximum retries
Retried int // Number of retries so far
LastError string // Last error message
CompletedAt *time.Time // When completed
NextProcessAt *time.Time // When will be processed
}type Config struct {
// Concurrency is the maximum number of concurrent processing of tasks
// Default: 10
Concurrency int
// Queues is a map of queue names to their priority levels
// Higher priority queues will be processed more frequently
// Example: {"critical": 6, "default": 3, "low": 1}
Queues map[string]int
// StrictPriority indicates whether the queue priority should be treated strictly
// If true, tasks in higher priority queues are processed first
// Default: false
StrictPriority bool
// ShutdownTimeout is the duration to wait for workers to finish before forcing shutdown
// Default: 8 seconds
ShutdownTimeout int
}cfg := &queue.Config{
Concurrency: 10,
}cfg := &queue.Config{
Concurrency: 20,
Queues: map[string]int{
"critical": 6, // Processed 6x more frequently
"default": 3, // Processed 3x more frequently
"low": 1, // Processed least frequently
},
}Analogy: VIP line is served faster than regular line.
cfg := &queue.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
StrictPriority: true, // Process critical until empty, then default, then low
}Analogy:
true= Serve VIP until empty, then serve regularfalse= Serve VIP more often, but still serve regular
Options are functions that configure task behavior. They use a flexible key-value pattern.
Retry the task if it fails.
queue.Enqueue(ctx, "email:send", payload,
queue.MaxRetry(3), // Retry up to 3 times
)Analogy: If laundry fails, try washing again up to 3 times.
Put task in a specific priority queue.
queue.Enqueue(ctx, "payment:process", payload,
queue.QueueName("critical"), // High priority
)Analogy: Put in VIP line instead of regular line.
Maximum time allowed for task execution.
queue.Enqueue(ctx, "image:process", payload,
queue.Timeout(5*time.Minute), // Max 5 minutes
)Analogy: If washing takes more than 5 minutes, force stop.
Process task after a certain duration.
queue.Enqueue(ctx, "reminder:send", payload,
queue.ProcessIn(24*time.Hour), // Process in 24 hours
)Analogy: Set timer on washing machine to start in 24 hours.
Process task at a specific time.
tomorrow := time.Now().Add(24*time.Hour)
queue.Enqueue(ctx, "report:generate", payload,
queue.ProcessAt(tomorrow), // Process tomorrow
)Analogy: Set washing machine to start at 10 AM tomorrow.
Prevent duplicate tasks for a duration.
queue.Enqueue(ctx, "report:monthly", payload,
queue.Unique(1*time.Hour), // Unique for 1 hour
)Analogy: If same clothes are already in the machine, don't add again.
Assign a custom ID to the task.
queue.Enqueue(ctx, "order:process", payload,
queue.TaskID("order-12345"), // Custom ID
)Task must complete before this absolute time.
deadline := time.Now().Add(1*time.Hour)
queue.Enqueue(ctx, "urgent:task", payload,
queue.Deadline(deadline), // Must finish within 1 hour
)How long to keep task data after completion.
queue.Enqueue(ctx, "log:cleanup", payload,
queue.Retention(7*24*time.Hour), // Keep for 7 days
)Assign task to a specific group.
queue.Enqueue(ctx, "user:sync", payload,
queue.Group("user-operations"),
)queue.Enqueue(ctx, "important:task", payload,
queue.MaxRetry(3),
queue.Timeout(30*time.Second),
queue.QueueName("critical"),
queue.Unique(1*time.Hour),
)Tasks can have different states during their lifecycle:
| State | Description | Analogy |
|---|---|---|
pending |
Waiting to be processed | In the basket, not washed yet |
active |
Currently being processed | In the washing machine now |
scheduled |
Scheduled for later | Scheduled for 10 AM |
retry |
Failed, will be retried | Failed, will be washed again |
archived |
Failed permanently | Failed completely, can't be washed |
completed |
Successfully completed | Clean and done |
Checking task state:
taskInfo, _ := q.GetTaskInfo(ctx, taskID)
fmt.Println(taskInfo.State) // "pending", "active", "completed", etc.Middleware functions wrap handlers to add extra functionality.
Records logs when task starts and completes.
w.RegisterWithMiddleware("email:send", handler,
queue.LoggingMiddleware("email:send"),
)Output:
[email:send] Task started
[email:send] Task completed in 1.2s
Analogy: Recording in a book "10:00 - Started washing, 10:30 - Finished washing".
Catches panics so the program doesn't crash.
w.RegisterWithMiddleware("email:send", handler,
queue.RecoveryMiddleware("email:send"),
)Analogy: If washing machine errors, shut down safely, don't explode.
Records when a task will be retried.
w.RegisterWithMiddleware("email:send", handler,
queue.RetryLoggingMiddleware("email:send"),
)Output:
[email:send] Task will be retried: connection timeout
Adds a timeout to task execution.
w.RegisterWithMiddleware("email:send", handler,
queue.TimeoutMiddleware(5*time.Minute),
)Analogy: If washing takes more than 5 minutes, force stop.
Tracks task metrics (placeholder for actual metrics implementation).
w.RegisterWithMiddleware("email:send", handler,
queue.MetricsMiddleware("email:send"),
)Analogy: Recording "Today washed 10 times, 9 successful, 1 failed".
Combines multiple middleware functions.
w.RegisterWithMiddleware("email:send", handler,
queue.ChainMiddleware(
queue.LoggingMiddleware("email:send"),
queue.RecoveryMiddleware("email:send"),
queue.TimeoutMiddleware(30*time.Second),
),
)Analogy: Using logging + recovery + timeout all at once.
// Enqueue
q.Enqueue(ctx, "email:send", map[string]string{
"to": "user@example.com",
"subject": "Hello",
"body": "Hello World",
})
// Worker
w.Register("email:send", func(ctx context.Context, payload []byte) error {
var email map[string]string
json.Unmarshal(payload, &email)
return sendEmail(email)
})q.Enqueue(ctx, "image:process", map[string]string{
"url": "https://example.com/image.jpg",
},
queue.MaxRetry(3),
queue.Timeout(5*time.Minute),
queue.QueueName("critical"),
)// Send reminder in 24 hours
q.Enqueue(ctx, "reminder:send", map[string]string{
"user_id": "user123",
"message": "Don't forget!",
},
queue.ProcessIn(24*time.Hour),
)// Prevent duplicate report generation
q.Enqueue(ctx, "report:generate", map[string]string{
"report_id": "monthly-2024-11",
},
queue.Unique(1*time.Hour),
queue.TaskID("report-monthly-2024-11"),
)cfg := &queue.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6, // Processed more frequently
"default": 3,
"low": 1, // Processed less frequently
},
}
worker := queue.NewWorker(cfg, redisClient)// Register handler with logging and recovery middleware
w.RegisterWithMiddleware("email:send",
func(ctx context.Context, payload []byte) error {
var email map[string]string
json.Unmarshal(payload, &email)
return sendEmail(email)
},
queue.LoggingMiddleware("email:send"),
queue.RecoveryMiddleware("email:send"),
queue.TimeoutMiddleware(30*time.Second),
)// Enqueue task
result, _ := q.Enqueue(ctx, "email:send", emailPayload)
// Get task information
taskInfo, err := q.GetTaskInfo(ctx, result.TaskID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Task ID: %s\n", taskInfo.ID)
fmt.Printf("State: %s\n", taskInfo.State)
fmt.Printf("Queue: %s\n", taskInfo.Queue)
fmt.Printf("Retried: %d/%d\n", taskInfo.Retried, taskInfo.MaxRetry)
if taskInfo.NextProcessAt != nil {
fmt.Printf("Next process at: %s\n", taskInfo.NextProcessAt)
}sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
worker.Start()
}()
<-sigChan
worker.Stop() // Waits for running tasks to finish// Enqueue 1000 emails
for _, user := range users {
q.Enqueue(ctx, "email:send", map[string]string{
"to": user.Email,
"subject": "Newsletter",
"body": "Check out our latest news!",
})
}
// User gets immediate response, emails sent in background// Upload image
q.Enqueue(ctx, "image:resize", map[string]string{
"url": uploadedImage,
},
queue.Timeout(5*time.Minute),
queue.MaxRetry(2),
)
// User doesn't wait for resize to complete// Generate monthly report
q.Enqueue(ctx, "report:monthly", map[string]string{
"month": "November",
},
queue.ProcessAt(tomorrow), // Process tomorrow
queue.Unique(24*time.Hour), // Prevent duplicates
)// Schedule nightly backup
q.Enqueue(ctx, "backup:database", map[string]string{
"database": "production",
},
queue.ProcessAt(midnight),
queue.QueueName("low"),
)// Critical payment task
q.Enqueue(ctx, "payment:process", paymentData,
queue.QueueName("critical"),
queue.MaxRetry(3),
queue.Timeout(30*time.Second),
queue.Unique(5*time.Minute),
)type EmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
// Good
q.Enqueue(ctx, "email:send", EmailPayload{
To: "user@example.com",
Subject: "Welcome",
Body: "Hello!",
})w.Register("task", func(ctx context.Context, payload []byte) error {
// Return error to trigger retry
if err := process(); err != nil {
return fmt.Errorf("processing failed: %w", err)
}
return nil
})w.Register("task", func(ctx context.Context, payload []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
// Do work
}
return nil
})queue.Enqueue(ctx, "long-task", payload,
queue.Timeout(10*time.Minute),
queue.MaxRetry(2),
)// Critical tasks
queue.Enqueue(ctx, "payment:process", payload,
queue.QueueName("critical"))
// Low priority tasks
queue.Enqueue(ctx, "analytics:update", payload,
queue.QueueName("low"))sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
worker.Start()
}()
<-sigChan
worker.Stop() // Graceful shutdownw.RegisterWithMiddleware("task", handler,
queue.LoggingMiddleware("task"),
queue.RecoveryMiddleware("task"),
queue.MetricsMiddleware("task"),
)// After enqueuing
result, _ := q.Enqueue(ctx, "task", payload)
// Check status later
taskInfo, _ := q.GetTaskInfo(ctx, result.TaskID)
if taskInfo.State == queue.TaskStateArchived {
log.Printf("Task failed permanently: %s", taskInfo.LastError)
}The example/ directory contains a complete working example.
cd pkg/queue/example
go run main.go workerExpected output:
=== Example: Worker Processing Tasks ===
Registering handlers for each task type...
All handlers registered successfully
Starting worker...
Press Ctrl+C to stop
cd pkg/queue/example
go run main.goExpected output:
=== Example: Enqueuing Tasks ===
1. Enqueuing simple email task...
Task enqueued successfully: ID=abc123
2. Enqueuing email with retry and timeout...
Task enqueued: ID=def456 (critical queue, 3 retries, 30s timeout)
... and so on
Processing task ID: abc123
📧 Sending email to user@example.com: Welcome!
Email sent successfully to user@example.com
Task ID: def456
🔔 Sending notification to user user123: Your order has been shipped!
Notification sent successfully
... and so on
Solution:
- Make sure Redis is running:
redis-cli ping - Check port: Default is 6379
- Check firewall settings
Solution:
- Make sure worker is running
- Check handler is registered for the task type
- Check error logs in worker
Solution:
- Check error message in logs
- Make sure payload can be unmarshaled
- Check logic in handler
- Increase timeout if needed
# Enter Redis CLI
redis-cli
# View all keys
KEYS *
# View queue contents
LRANGE asynq:{default}:pending 0 -1
LRANGE asynq:{critical}:pending 0 -1
LRANGE asynq:{low}:pending 0 -1redis-cli
SMEMBERS asynq:{default}:activeredis-cli FLUSHALLYour Application
↓
Queue Interface (abstraction)
↓
AsynqQueue (implementation)
↓
hibiken/asynq (internal only)
↓
Redis
Key Point: Your code only depends on the Queue and Worker interfaces, not on asynq or any specific implementation.
Your code doesn't depend on a specific library (asynq). If you want to switch to another library tomorrow, just change the implementation, your code stays the same.
Uses Go's type system, so it's safer and your IDE can help with autocomplete.
Can add new options without changing existing code.
Can add features (logging, metrics, etc.) without changing handlers.
When shutting down the worker, it waits for running tasks to finish first.
Simple API that's easy to understand and use.
Built on top of battle-tested asynq library.
If you want to switch from asynq to another library:
- Create a new file (e.g.,
rabbitmq.go) - Implement
QueueandWorkerinterfaces - Update
NewQueue()andNewWorker()to use the new implementation - External code doesn't need to change!
You can create custom queue implementations by implementing the Queue and Worker interfaces.
type MyCustomQueue struct {
// Your fields
}
func (q *MyCustomQueue) Enqueue(ctx context.Context, taskType string, payload any, opts ...Option) (*OutputEnqueue, error) {
// Your implementation
return &OutputEnqueue{}, nil
}
func (q *MyCustomQueue) GetTaskInfo(ctx context.Context, taskID string) (*TaskInfo, error) {
// Your implementation
return &TaskInfo{}, nil
}
// Implement other Queue interface methods...type MyCustomWorker struct {
// Your fields
}
func (w *MyCustomWorker) RegisterHandler(taskType string, handler Handler) {
// Your implementation
}
func (w *MyCustomWorker) Use(middleware ...MiddlewareFunc) {
// Your implementation
}
func (w *MyCustomWorker) Start() error {
// Your implementation
return nil
}
func (w *MyCustomWorker) Stop() {
// Your implementation
}
// Implement other Worker interface methods...func MyCustomMiddleware(next Handler) Handler {
return func(ctx context.Context, task *Task) error {
// Before task execution
log.Printf("Starting task: %s", task.Type)
// Execute task
err := next(ctx, task)
// After task execution
if err != nil {
log.Printf("Task failed: %s", task.Type)
} else {
log.Printf("Task completed: %s", task.Type)
}
return err
}
}
// Use it
worker.Use(MyCustomMiddleware)The queue package is a task queue system that:
- Separates heavy work from user responses
- Can process many tasks in parallel
- Has features like retry, scheduling, priority, etc.
- Is easy to use and maintain
Complete Analogy:
- Queue = Laundry basket
- Worker = Washing machine
- Task = Dirty clothes
- Handler = Wash program
- Redis = Storage for the basket
- Middleware = Extra features (fragrance, fabric softener, etc.)
Now you can use the queue package for your own applications!