A concurrent event bus and event hub system for Go applications.
├── pkg/
│ ├── eventbus/ # Core event bus implementation
│ │ └── bus.go # Type-safe event bus with handler management
│ └── hub/ # Hub management layer
│ └── hub.go # Multi-bus coordination and convenience functions
├── internal/
│ └── errors/ # Internal error definitions and utilities
│ └── errors.go # Error types and ID generation
├── examples/ # Usage examples
│ └── basic.go # Basic usage patterns
└── doc.go # Package documentation
- Type Safety: Full generics support with no
any
orinterface{}
usage - Concurrency Safe: All operations protected with proper synchronization
- Handler Lifecycle: Subscribe/unsubscribe with unique handler IDs
- Multiple Publish Modes: Synchronous and asynchronous event publishing
- Context Support: Cancellation and timeout handling
- Hub Management: Coordinate multiple typed event buses
- Clean Architecture: Separated concerns with clear boundaries
This directory contains various examples demonstrating different aspects of the Go Event System.
Each example is in its own directory to avoid naming conflicts. To run an example:
cd examples/basic && go run main.go
cd examples/advanced && go run main.go
cd examples/production && go run main.go
Demonstrates the fundamental usage of the event system with simple event handlers.
Shows more complex scenarios including concurrent publishing and handler management.
Comprehensive demonstration of handler return patterns including:
- Direct channel responses
- Callback-based responses
- Future-like patterns with sync.WaitGroup
- Concurrent processing
Additional examples of various implementation patterns.
Simplified examples focusing on handler return mechanisms.
- Type-safe event handling with generics
- Concurrent-safe operations
- Multiple response patterns without modifying core system
- Handler lifecycle management
- Context cancellation support
- Production-ready error handling
package main
import (
"context"
"fmt"
"github.com/arash-mosavi/go-event-system/pkg/eventbus"
"github.com/arash-mosavi/go-event-system/pkg/hub"
)
type UserEvent struct {
UserID string
Email string
Action string
}
func main() {
// Create event hub
eventHub := hub.NewHub()
defer eventHub.Close()
// Subscribe to events
hub.Subscribe(eventHub, "users", "user.registered",
func(ctx context.Context, event eventbus.Event[UserEvent]) error {
fmt.Printf("User registered: %s\n", event.Data.Email)
return nil
})
// Publish events
userEvent := eventbus.Event[UserEvent]{
Type: "user.registered",
Data: UserEvent{
UserID: "123",
Email: "[email protected]",
Action: "registration",
},
}
hub.Publish(eventHub, "users", context.Background(), userEvent)
}
// Create a typed bus directly
userBus := eventbus.NewBus[UserEvent]()
defer userBus.Close()
// Subscribe with handler ID for later removal
handlerID := userBus.Subscribe("user.login", func(ctx context.Context, event eventbus.Event[UserEvent]) error {
fmt.Printf("User %s logged in\n", event.Data.UserID)
return nil
})
// Publish synchronously
event := eventbus.Event[UserEvent]{
Type: "user.login",
Data: UserEvent{UserID: "user123"},
}
userBus.Publish(context.Background(), event)
// Publish asynchronously
userBus.PublishAsync(context.Background(), event)
// Remove specific handler
userBus.Unsubscribe("user.login", handlerID)
While the core EventBus system is designed for fire-and-forget event handling, you can implement various patterns to handle scenarios where you need return values, responses, or coordination between handlers.
Include callback functions directly in your event data for immediate responses:
type CallbackEvent struct {
Data string
Callback func(result string, err error)
}
// Handler processes and calls back immediately
hub.Subscribe(eventHub, "callbacks", "process", func(ctx context.Context, event eventbus.Event[CallbackEvent]) error {
result := processData(event.Data.Data)
event.Data.Callback(result, nil)
return nil
})
// Usage
event := eventbus.Event[CallbackEvent]{
Type: "process",
Data: CallbackEvent{
Data: "input",
Callback: func(result string, err error) {
fmt.Printf("Result: %s\n", result)
},
},
}
Implement request-response flows using separate event types and correlation IDs:
type UserLookupRequest struct {
RequestID string
UserID string
}
type UserLookupResponse struct {
RequestID string
UserData map[string]interface{}
Error string
}
// Service handler
hub.Subscribe(eventHub, "requests", "user.lookup", func(ctx context.Context, event eventbus.Event[UserLookupRequest]) error {
req := event.Data
user, err := getUserData(req.UserID)
response := UserLookupResponse{
RequestID: req.RequestID,
UserData: user,
}
if err != nil {
response.Error = err.Error()
}
responseEvent := eventbus.Event[UserLookupResponse]{
Type: "user.lookup.response",
Data: response,
}
return hub.Publish(eventHub, "responses", ctx, responseEvent)
})
Coordinate multiple processing phases using event chains:
type ProcessingRequest struct {
ID string
Phase string // "validate", "transform", "store"
Data string
}
hub.Subscribe(eventHub, "processing", "process.request", func(ctx context.Context, event eventbus.Event[ProcessingRequest]) error {
req := event.Data
// Process current phase
result := processPhase(req.Phase, req.Data)
// Determine next phase
nextPhase := getNextPhase(req.Phase)
if nextPhase != "" {
nextRequest := ProcessingRequest{
ID: req.ID,
Phase: nextPhase,
Data: result,
}
nextEvent := eventbus.Event[ProcessingRequest]{
Type: "process.request",
Data: nextRequest,
}
return hub.Publish(eventHub, "processing", ctx, nextEvent)
}
// Final phase - publish completion
return publishCompletion(req.ID, result)
})
Collect results from multiple handlers before proceeding:
type AggregationRequest struct {
RequestID string
SubRequests []string
ExpectedCount int
}
type SubResponse struct {
ParentID string
SubID string
Result string
}
// Use a coordinator to collect multiple responses
type ResponseAggregator struct {
responses map[string]map[string]string // [requestID][subID]result
mutex sync.RWMutex
}
func (r *ResponseAggregator) handleSubResponse(ctx context.Context, event eventbus.Event[SubResponse]) error {
resp := event.Data
r.mutex.Lock()
defer r.mutex.Unlock()
if r.responses[resp.ParentID] == nil {
r.responses[resp.ParentID] = make(map[string]string)
}
r.responses[resp.ParentID][resp.SubID] = resp.Result
// Check if all responses collected
if len(r.responses[resp.ParentID]) >= expectedCount {
return r.publishAggregatedResult(resp.ParentID)
}
return nil
}
Implement robust error handling with retry logic:
type RetryableRequest struct {
RequestID string
Data string
MaxRetries int
CurrentRetry int
}
hub.Subscribe(eventHub, "retry", "process", func(ctx context.Context, event eventbus.Event[RetryableRequest]) error {
req := event.Data
err := processRequest(req.Data)
if err != nil && req.CurrentRetry < req.MaxRetries {
// Retry with exponential backoff
retryDelay := time.Duration(1<<uint(req.CurrentRetry)) * 100 * time.Millisecond
go func() {
time.Sleep(retryDelay)
retryReq := req
retryReq.CurrentRetry++
retryEvent := eventbus.Event[RetryableRequest]{
Type: "process",
Data: retryReq,
}
hub.PublishAsync(eventHub, "retry", context.Background(), retryEvent)
}()
return err // Log the error but continue processing
}
if err != nil {
return publishFailure(req.RequestID, err)
}
return publishSuccess(req.RequestID, result)
})
Implement circuit breaker logic to handle failing services:
type CircuitBreaker struct {
failures int
maxFailures int
resetTimeout time.Duration
nextAttempt time.Time
state string // "closed", "open", "half-open"
mutex sync.RWMutex
}
func (cb *CircuitBreaker) handleRequest(ctx context.Context, event eventbus.Event[ServiceRequest]) error {
if !cb.canExecute() {
return publishError(event.Data.ID, "Circuit breaker is open")
}
err := processServiceRequest(event.Data)
if err != nil {
cb.onFailure()
return err
}
cb.onSuccess()
return nil
}
- Use Correlation IDs: Always include unique request IDs for tracking requests and responses
- Handle Timeouts: Implement timeouts for request-response patterns to prevent deadlocks
- Error Propagation: Design clear error handling strategies for each pattern
- Resource Cleanup: Properly clean up channels, goroutines, and other resources
- Monitoring: Add metrics and logging to track request flows and error rates
- Testing: Test timeout scenarios, error conditions, and race conditions
- Documentation: Document the expected flow and error conditions for each pattern
- Callback Pattern: Fastest, no additional allocations
- Request-Response: Moderate overhead due to correlation tracking
- Multi-Phase: Higher latency due to sequential processing
- Aggregation: Memory overhead scales with concurrent requests
- Retry/Circuit Breaker: Additional overhead for failure tracking
Choose the appropriate pattern based on your specific requirements for consistency, performance, and complexity.
The system is optimized for high-throughput scenarios:
- Subscribe: ~116ns/op with minimal allocations
- Single Handler Publish: ~551ns/op
- Hub Access: ~28ns/op with zero allocations
- Concurrent Operations: Full thread safety with race detector validation
# Run tests for all packages
go test ./...
# Run specific package tests
go test ./pkg/eventbus
go test ./pkg/hub
# Run examples
go run examples/basic.go
Core event bus implementation providing type-safe event handling for a single event type.
Management layer that coordinates multiple typed event buses with convenience functions.
Internal error definitions and utilities, including unique ID generation for handlers.
- Type Safety: No runtime type assertions, full compile-time safety
- Concurrency: All operations are thread-safe without compromising performance
- Clean Architecture: Clear separation between core logic and management layers
- Production Ready: No decorative code, optimized for real-world usage
- Testability: Comprehensive test coverage with performance benchmarks