-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/bull mq integration 2 #2983
base: dev
Are you sure you want to change the base?
Conversation
|
WalkthroughThis pull request introduces a comprehensive implementation of a Redis-based queue system for managing webhooks in the workflows service. The changes include setting up a Docker Compose configuration for Redis, adding BullMQ and Bull Board for queue management, and refactoring webhook handling to support both synchronous and asynchronous processing. The implementation adds environment-based configuration for queue system enablement, introduces new services for incoming and outgoing webhooks, and provides a flexible infrastructure for job processing with error handling and logging. Changes
Sequence DiagramsequenceDiagram
participant Client
participant WebhookController
participant OutgoingWebhookService
participant OutgoingWebhookQueueService
participant Redis
alt Queue System Enabled
Client->>WebhookController: Send Webhook Request
WebhookController->>OutgoingWebhookQueueService: Add Job
OutgoingWebhookQueueService->>Redis: Enqueue Job
else Queue System Disabled
Client->>WebhookController: Send Webhook Request
WebhookController->>OutgoingWebhookService: Invoke Webhook
OutgoingWebhookService->>Client: Send Webhook Directly
end
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 20
🔭 Outside diff range comments (3)
services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts (1)
Line range hint
86-90
: Avoid logging sensitive data in error logsIn the error handling, the
this.logger.error
call logs the entiredata
object, which may contain sensitive or personal information. Logging sensitive data can lead to security vulnerabilities and PII leakage.Consider sanitizing the
data
object before logging or logging only non-sensitive information. For example:- this.logger.error('Webhook error data', { - data, - error, - webhook, - }); + this.logger.error('Webhook error data', { + error, + webhookId: webhook.id, + // Include any additional non-sensitive information if necessary + });🧰 Tools
🪛 Biome (1.9.4)
[error] 84-84: Catch clause variable type annotation must be 'any' or 'unknown' if specified.
(parse)
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (1)
Line range hint
147-194
: Remove duplicate webhook sending logic.There's duplicate code for constructing the webhook payload and configuration. Consider extracting this logic into a separate method.
Example refactor:
private buildWebhookPayload(data: WebhookData, id: string, environment: string, apiVersion: string) { const { runtimeData, correlationId, entityId, ...restData } = data; const { createdAt, resolvedAt, workflowDefinitionId, id: runtimeDataId, ...restRuntimeData } = runtimeData; return { id, eventName: 'workflow.completed', apiVersion, // ... rest of the payload construction }; } private buildWebhookArgs(url: string, payload: any, webhookSharedSecret: string) { return { requestConfig: { url, method: 'POST', headers: {}, body: payload, timeout: this.configService.get('WEBHOOK_TIMEOUT') ?? 15_000, }, customerConfig: { webhookSharedSecret, }, }; }services/workflows-service/package.json (1)
Line range hint
1-1
: Update lockfile to match package.json dependencies.The pipeline is failing because the lockfile is out of sync with package.json.
Run the following command to update the lockfile:
pnpm install🧰 Tools
🪛 GitHub Actions: CI
[error] Lock file (pnpm-lock.yaml) is out of sync with package.json dependencies. The package specifications in the lockfile do not match the specifications in package.json. Run 'pnpm install' without the frozen-lockfile option to update the lock file.
🧹 Nitpick comments (18)
services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts (1)
15-15
: Consider removing the emptyWebhookHttpService
classThe
WebhookHttpService
class is currently empty and does not extend any other class or provide any implementation. If it is no longer needed, you may consider removing it to clean up the codebase.services/workflows-service/src/bull-mq/outgoing-webhook/outgoing-webhook-queue.service.ts (1)
19-20
: RedundantinitializeWorker()
callThe
initializeWorker()
method is called both in the base class constructor and the subclass constructor. This redundancy might lead to unexpected behavior or multiple initializations. Consider removing the call in the subclass if it's already handled by the base class.services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts (1)
98-110
: Specify the type ofwebhookArgs
for clarityAdding an explicit type annotation to
webhookArgs
improves readability and ensures type safety.Apply this diff to specify the type:
const webhookArgs = { + const webhookArgs: WebhookJobData = { requestConfig: {
services/workflows-service/src/bull-mq/base/base-queue-worker.service.ts (1)
73-75
: Use generic log messages for better reusabilityThe log messages refer to "Webhook job", which may not be appropriate for all subclasses of the base class. Using generic terms like "Job" improves the base class's reusability.
Apply this diff to update the log messages:
this.logger.log(`Webhook job ${job.id} is active`); + this.logger.log(`Job ${job.id} is active`); this.logger.warn( - `Webhook job ${job?.id} failed. Attempt: ${currentAttempts}. Error: ${error.message}`, + `Job ${job?.id} failed. Attempt: ${currentAttempts}. Error: ${error.message}`, );Also applies to: 98-100
services/workflows-service/src/bull-mq/consts.ts (1)
8-13
: Consider optimizing retry configurations.The current retry settings might need adjustment:
- 10-15 retry attempts with exponential backoff could lead to very long processing times
- Using the same backoff delay (1000ms) for all queues might not be optimal for different types of operations
Consider:
- Reducing the number of attempts
- Adjusting backoff delays based on queue purpose
- Adding maximum delay to prevent excessive wait times
config: { - attempts: 15, + attempts: 5, backoff: { type: 'exponential', delay: 1000, + maxDelay: 60000, }, },Also applies to: 18-23, 28-33
services/workflows-service/src/bull-mq/incoming-webhook/incoming-webhook-queue.service.ts (1)
11-15
: Add health check method.Consider adding a health check method to monitor queue health and connection status.
constructor(protected readonly logger: AppLoggerService) { super(QUEUES.INCOMING_WEBHOOKS_QUEUE.name, logger); } + async isHealthy(): Promise<boolean> { + try { + const isPaused = await this.queue.isPaused(); + const workerInfo = await this.worker.getWorkerInfo(); + return !isPaused && !!workerInfo; + } catch (error) { + this.logger.error('Health check failed:', error); + return false; + } + }services/workflows-service/src/business-report/business-report-list.dto.ts (2)
7-18
: Consider adding validation constraints for pagination.While the DTO includes validation for
businessId
andsearch
, thepage
property lacks explicit validation constraints. Consider adding validation to ensure proper pagination parameters.export class BusinessReportListRequestParamDto { @IsOptional() @IsString() businessId?: string; @IsOptional() @ApiProperty({ type: String, required: false }) search?: string; @ApiProperty({ type: PageDto }) + @ValidateNested() + @Type(() => PageDto) page!: PageDto; }
28-37
: Consider adding example values for the data array.The response DTO is well-structured with appropriate Swagger decorations. Consider enhancing the
data
property's API documentation with example values for better API documentation.- @ApiProperty({ type: [BusinessReportDto] }) + @ApiProperty({ + type: [BusinessReportDto], + example: [{ + id: 'report-123', + status: 'COMPLETED', + // ... other example fields + }] + }) data!: BusinessReportDto[];services/workflows-service/src/business-report/business-report.module.ts (1)
11-11
: Consider restructuring to eliminate circular dependencies.The module has multiple circular dependencies that are being handled with
forwardRef
. While this works, it might indicate a need to restructure the modules to have clearer boundaries and responsibilities.Consider:
- Extracting shared functionality into a separate module
- Using events/message bus for cross-module communication
- Implementing the mediator pattern
services/workflows-service/src/business-report/business-report.dto.ts (2)
11-23
: Consider adding URL validation.The WebsiteDto should include URL format validation to ensure data integrity.
export class WebsiteDto { @ApiProperty({ type: String }) id!: string; @ApiProperty({ type: String }) + @IsUrl({ + require_tld: true, + require_protocol: true, + }) url!: string; @ApiProperty({ type: String }) createdAt!: string; @ApiProperty({ type: String }) updatedAt!: string; }
65-67
: Consider typing the data property more strictly.Using
Record<string, unknown>
for the data property might be too permissive and could lead to runtime errors. Consider defining a more specific type or interface for the expected data structure.- @ApiProperty({ type: Object }) - data!: Record<string, unknown>; + @ApiProperty({ + type: 'object', + additionalProperties: true, + description: 'Dynamic data specific to the report type' + }) + data!: Record<string, string | number | boolean | null>;services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts (1)
10-24
: Consider adding request validation and timeout constraints.While the type definitions are comprehensive, consider adding:
- URL validation to ensure it's a valid HTTP/HTTPS URL
- Reasonable constraints on the timeout value to prevent excessive waiting
requestConfig: { - url: string; + url: z.string().url(); method: Method; headers?: Record<string, string>; body?: AnyRecord | string; - timeout?: number; + timeout?: z.number().min(1000).max(30000); };services/workflows-service/src/webhooks/incoming/incoming-webhooks.module.ts (1)
41-41
: Consider organizing providers into feature modules.The module has a large number of providers and dependencies. Consider:
- Grouping related providers into feature modules
- Using forwardRef only where necessary to break circular dependencies
Also applies to: 79-79, 82-82, 84-84
services/workflows-service/src/env.ts (1)
96-106
: Enhance Redis configuration validation.Consider the following improvements:
- Add port number validation
- Add database number validation
- Consider using URL format for Redis connection string
- REDIS_PORT: z.string().transform(value => Number(value)), + REDIS_PORT: z.string().transform(value => Number(value)) + .refine(port => port > 0 && port < 65536, 'Port must be between 1 and 65535'), - REDIS_DB: z - .string() - .transform(value => Number(value)) - .optional(), + REDIS_DB: z.string() + .transform(value => Number(value)) + .refine(db => db >= 0 && db <= 15, 'Redis DB must be between 0 and 15') + .optional(),services/workflows-service/src/events/workflow-completed-webhook-caller.ts (1)
134-145
: Extract timeout to configuration.The timeout value is hardcoded. Consider moving it to the configuration service for better maintainability.
- timeout: 15_000, + timeout: this.configService.get('WEBHOOK_TIMEOUT') ?? 15_000,services/workflows-service/src/events/document-changed-webhook-caller.ts (1)
206-217
: Consider adding retry configuration.The webhook configuration is well-structured but lacks retry settings which are crucial for handling temporary network issues.
Add retry configuration to improve reliability:
const webhookArgs = { requestConfig: { url, method: 'POST', headers: {}, body: payload, timeout: 15_000, + retry: { + maxRetries: 3, + initialDelayMs: 1000, + backoffFactor: 2, + }, }, customerConfig: { webhookSharedSecret, }, } as const;services/workflows-service/docker-compose.redis.yml (1)
1-22
: LGTM! Consider enhancing security with additional Redis configuration.The Redis configuration looks good, particularly:
- Using the lightweight Alpine-based image
- Implementing password authentication
- Configuring data persistence with AOF
- Using local driver for volume (as per previous feedback)
Consider adding these security-focused Redis configurations:
command: > --requirepass ${REDIS_PASSWORD} --appendonly yes + --protected-mode yes + --tcp-backlog 128 + --maxmemory-policy allkeys-lru + --maxmemory 100mbservices/workflows-service/package.json (1)
60-60
: Consider using stricter version constraints for BullMQ dependencies.The caret (
^
) version constraint allows minor version updates which could introduce breaking changes.Apply this diff to use exact versions:
- "@nestjs/bullmq": "^10.2.1", + "@nestjs/bullmq": "10.2.1", - "bullmq": "^5.13.2", + "bullmq": "5.13.2",Also applies to: 87-87
🧰 Tools
🪛 GitHub Actions: CI
[error] Lock file (pnpm-lock.yaml) is out of sync with package.json dependencies. The package specifications in the lockfile do not match the specifications in package.json. Run 'pnpm install' without the frozen-lockfile option to update the lock file.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (42)
apps/backoffice-v2/src/lib/blocks/components/EditableDetails/EditableDetails.tsx
(1 hunks)apps/backoffice-v2/src/lib/blocks/hooks/useDocumentBlocks/useDocumentBlocks.tsx
(1 hunks)apps/backoffice-v2/src/pages/MerchantMonitoring/components/MerchantMonitoringTable/columns.tsx
(1 hunks)apps/backoffice-v2/src/pages/MerchantMonitoring/get-merchant-monitoring-search-schema.ts
(1 hunks)apps/kyb-app/src/pages/CollectionFlow/CollectionFlow.tsx
(1 hunks)packages/ui/src/components/templates/report/adapters/report-adapter/report-adapter.ts
(1 hunks)packages/workflow-core/src/lib/plugins/external-plugin/vendor-consts.ts
(2 hunks)services/workflows-service/docker-compose.redis.yml
(1 hunks)services/workflows-service/package.json
(4 hunks)services/workflows-service/prisma/data-migrations
(1 hunks)services/workflows-service/src/alert/alert.module.ts
(3 hunks)services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts
(2 hunks)services/workflows-service/src/app.module.ts
(1 hunks)services/workflows-service/src/bull-mq/base/base-queue-worker.service.ts
(1 hunks)services/workflows-service/src/bull-mq/bull-mq.module.ts
(1 hunks)services/workflows-service/src/bull-mq/consts.ts
(1 hunks)services/workflows-service/src/bull-mq/incoming-webhook/incoming-webhook-queue.service.ts
(1 hunks)services/workflows-service/src/bull-mq/incoming-webhook/types/types.ts
(1 hunks)services/workflows-service/src/bull-mq/outgoing-webhook/outgoing-webhook-queue.service.ts
(1 hunks)services/workflows-service/src/bull-mq/outgoing-webhook/types/types.ts
(1 hunks)services/workflows-service/src/bull-mq/types.ts
(1 hunks)services/workflows-service/src/business-report/business-report-list.dto.ts
(1 hunks)services/workflows-service/src/business-report/business-report.dto.ts
(1 hunks)services/workflows-service/src/business-report/business-report.module.ts
(1 hunks)services/workflows-service/src/business-report/dto/create-business-report-batch-body.dto.ts
(1 hunks)services/workflows-service/src/business-report/dto/get-business-report.dto.ts
(1 hunks)services/workflows-service/src/env.ts
(1 hunks)services/workflows-service/src/events/document-changed-webhook-caller.ts
(3 hunks)services/workflows-service/src/events/workflow-completed-webhook-caller.ts
(3 hunks)services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts
(3 hunks)services/workflows-service/src/redis/const/redis-config.ts
(1 hunks)services/workflows-service/src/redis/redis.module.ts
(1 hunks)services/workflows-service/src/webhooks/incoming/incoming-webhooks.controller.ts
(2 hunks)services/workflows-service/src/webhooks/incoming/incoming-webhooks.module.ts
(3 hunks)services/workflows-service/src/webhooks/incoming/incoming-webhooks.service.ts
(1 hunks)services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.module.ts
(1 hunks)services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts
(1 hunks)services/workflows-service/src/worker-app.module.ts
(1 hunks)services/workflows-service/src/worker-main.ts
(1 hunks)services/workflows-service/src/workflow/workflow.module.ts
(2 hunks)services/workflows-service/src/workflow/workflow.service.ts
(1 hunks)services/workflows-service/src/workflow/workflow.service.unit.test.ts
(1 hunks)
✅ Files skipped from review due to trivial changes (4)
- services/workflows-service/src/redis/redis.module.ts
- services/workflows-service/src/webhooks/incoming/incoming-webhooks.service.ts
- apps/backoffice-v2/src/lib/blocks/components/EditableDetails/EditableDetails.tsx
- services/workflows-service/prisma/data-migrations
🧰 Additional context used
📓 Learnings (1)
services/workflows-service/docker-compose.redis.yml (1)
Learnt from: alonp99
PR: ballerine-io/ballerine#2735
File: services/workflows-service/docker-compose.redis.yml:17-23
Timestamp: 2024-11-12T05:57:16.983Z
Learning: For Redis volumes in local development, prefer using `driver: local` in Docker Compose configurations.
🪛 Biome (1.9.4)
services/workflows-service/src/workflow/workflow.service.ts
[error] 1599-1599: Shouldn't redeclare 'collectionFlow'. Consider to delete it or rename it.
'collectionFlow' is defined here:
(lint/suspicious/noRedeclare)
apps/backoffice-v2/src/lib/blocks/hooks/useDocumentBlocks/useDocumentBlocks.tsx
[error] 412-412: This is an unexpected use of the debugger statement.
Unsafe fix: Remove debugger statement
(lint/suspicious/noDebugger)
apps/backoffice-v2/src/pages/MerchantMonitoring/components/MerchantMonitoringTable/columns.tsx
[error] 32-32: Shouldn't redeclare 'MERCHANT_REPORT_STATUSES_MAP'. Consider to delete it or rename it.
'MERCHANT_REPORT_STATUSES_MAP' is defined here:
(lint/suspicious/noRedeclare)
[error] 33-33: Shouldn't redeclare 'MERCHANT_REPORT_TYPES_MAP'. Consider to delete it or rename it.
'MERCHANT_REPORT_TYPES_MAP' is defined here:
(lint/suspicious/noRedeclare)
🪛 GitHub Actions: CI
services/workflows-service/package.json
[error] Lock file (pnpm-lock.yaml) is out of sync with package.json dependencies. The package specifications in the lockfile do not match the specifications in package.json. Run 'pnpm install' without the frozen-lockfile option to update the lock file.
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (javascript)
🔇 Additional comments (25)
services/workflows-service/src/bull-mq/bull-mq.module.ts (1)
1-70
: LGTM! The module is well-structured and integrates BullMQ effectively.The code is well-organized and correctly sets up the BullMQ queues and Bull Board monitoring interface.
services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts (1)
50-63
: EnsureQUEUE_SYSTEM_ENABLED
is properly converted to a booleanWhen checking
env.QUEUE_SYSTEM_ENABLED
, ensure that the environment variable is correctly parsed as a boolean value, especially if the variable is set as a string (e.g.,'true'
or'false'
). This will prevent unexpected behavior due to truthy or falsy string values.Consider explicitly parsing the environment variable to a boolean, for example:
- if (env.QUEUE_SYSTEM_ENABLED) { + if (env.QUEUE_SYSTEM_ENABLED === 'true') {Alternatively, update the configuration to ensure
QUEUE_SYSTEM_ENABLED
is a boolean value.services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts (1)
12-14
: Code changes enhance modularity and flexibilityThe additions of imports, injected services, and the queuing logic based on
env.QUEUE_SYSTEM_ENABLED
enhance the modularity and configurability of webhook handling. The implementation aligns well with the application's architecture.Also applies to: 26-27, 98-114
packages/ui/src/components/templates/report/adapters/report-adapter/report-adapter.ts (3)
14-24
: Effective use of utility functiontoRiskLabels
The
toRiskLabels
function efficiently standardizes risk indicators into a consistent format, enhancing code reusability and maintainability.
26-33
: Consistent risk level normalization withnormalizeRiskLevel
The
normalizeRiskLevel
function provides a streamlined approach to mapping risk levels, ensuring consistent severity representation across the application.
35-185
: Comprehensive and robustreportAdapter.DEFAULT
implementationThe
reportAdapter.DEFAULT
method thoroughly processes and adapts the report data, handling various data sources with appropriate optional chaining and type safety. This enhances the robustness and reliability of the data transformation.services/workflows-service/src/bull-mq/types.ts (1)
1-4
: Well-definedTJobPayloadMetadata
typeThe
TJobPayloadMetadata
type is correctly defined with optional propertiesprojectId
andcustomerName
, enhancing type safety and clarity in job payload handling.services/workflows-service/src/bull-mq/incoming-webhook/types/types.ts (1)
1-5
: AppropriateIncomingWebhookData
interface definitionThe
IncomingWebhookData
interface is well-structured, clearly defining the expected properties for handling incoming webhooks. The inclusion of aservice
function property promotes flexibility in processing webhook payloads asynchronously.services/workflows-service/src/bull-mq/outgoing-webhook/types/types.ts (1)
1-3
: LGTM! Well-structured type definition.The use of the
Parameters
utility type to extract the parameter types fromOutgoingWebhooksService.invokeWebhook
is a great practice. This ensures type safety and automatic updates if the method signature changes.services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.module.ts (1)
5-10
: Verify HTTP module dependency.The module looks well-structured, but please verify if
HttpModule
is imported either here or withinOutgoingWebhooksService
for making HTTP webhook calls.services/workflows-service/src/business-report/dto/get-business-report.dto.ts (1)
6-28
: LGTM! Well-structured DTO with proper validation and documentation.The DTO implementation follows best practices:
- Comprehensive swagger documentation
- Proper validation decorators
- Type-safe orderBy field using template literals
apps/backoffice-v2/src/pages/MerchantMonitoring/get-merchant-monitoring-search-schema.ts (1)
6-28
: LGTM! Well-implemented search schema with proper type safety.The schema implementation is solid:
- Good use of type safety with satisfies operator
- Reasonable fallback for sortBy
- Clean extension of base schema
services/workflows-service/src/business-report/business-report-list.dto.ts (1)
20-26
: LGTM! Robust schema validation.The Zod schema properly validates pagination parameters with appropriate constraints:
- Coerces and validates numbers
- Ensures positive integers
- Sets a reasonable maximum page size
services/workflows-service/src/business-report/dto/create-business-report-batch-body.dto.ts (1)
18-36
: LGTM! Well-structured enum validation.The type and version properties are properly validated using:
- Appropriate enum constraints
- Clear default values
- Comprehensive Swagger documentation
services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts (2)
1-8
: LGTM! Clean class setup with proper dependency injection.The class is correctly decorated with
@Injectable()
and uses constructor injection for the logger service.
27-40
: Enhance security headers and CORS configuration.The current implementation has a few security considerations:
Access-Control-Allow-Origin: '*'
is too permissive for a backend service- HMAC signature implementation looks secure, but consider adding a timestamp to prevent replay attacks
services/workflows-service/src/workflow/workflow.module.ts (1)
49-50
: LGTM! Verify BullMQ configuration.The integration of BullMQ and OutgoingWebhooks modules looks good. The modules are properly imported and added to the module's dependencies.
Let's verify the BullMQ configuration:
Also applies to: 68-69
✅ Verification successful
BullMQ integration verified and properly configured ✅
The BullMQ setup includes:
- Redis connection configuration
- Queue registration with Dead Letter Queue (DLQ) support
- BullBoard monitoring UI at
/queues
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for BullMQ configuration in the codebase # Check for Redis configuration which is required for BullMQ rg -l "createBullBoard|BullModule.forRoot|registerQueue" -g "*.ts"Length of output: 125
Script:
#!/bin/bash # Check the content of bull-mq.module.ts cat services/workflows-service/src/bull-mq/bull-mq.module.ts # Also check for Redis configuration in environment files rg -l "REDIS" -g "*.{ts,env}*"Length of output: 2376
services/workflows-service/src/app.module.ts (1)
135-135
: Verify the removal of WebhooksModule.The replacement of WebhooksModule with BullMqModule looks good, but we should ensure all webhook functionality has been properly migrated.
Let's verify no critical webhook functionality is broken:
✅ Verification successful
WebhooksModule removal is safe - functionality preserved in specialized modules
The webhook functionality has been properly migrated and modularized into IncomingWebhooksModule and OutgoingWebhooksModule, with complete BullMQ integration for queue management. All webhook capabilities (incoming endpoints, outgoing notifications, queue processing) are maintained through the new architecture.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for any remaining WebhooksModule dependencies # Search for any remaining references to WebhooksModule rg "WebhooksModule|@UseWebhooks|@Webhook" -g "*.ts" # Search for any webhook-related services that might need updating rg "implements.*WebhookHandler|extends.*WebhookBase" -g "*.ts"Length of output: 1443
Script:
#!/bin/bash # Search for any direct usage of WebhooksModule (not OutgoingWebhooksModule or IncomingWebhooksModule) rg "WebhooksModule(?!(\.|\s*}|,))" -g "*.ts" # Check for any webhook-related services or controllers that might be affected ast-grep --pattern 'class $_ { $$$ @Webhook($$$) $$$ }'Length of output: 357
Script:
#!/bin/bash # Search for WebhooksModule (exact match) rg "\bWebhooksModule\b" -g "*.ts" # Search for webhook decorators and implementations rg "@Webhook|webhook" -i -g "*.ts" -A 2Length of output: 65822
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (1)
106-132
: LGTM! Clear payload structure.The new payload structure is well-organized and properly separates workflow metadata from the actual data.
services/workflows-service/src/events/document-changed-webhook-caller.ts (2)
182-204
: LGTM! Well-structured webhook payload.The payload structure is comprehensive and includes all necessary information for webhook processing.
219-224
: LGTM! Good separation of concerns.The conditional logic for queue-based processing is clean and follows the single responsibility principle.
packages/workflow-core/src/lib/plugins/external-plugin/vendor-consts.ts (1)
661-705
: LGTM! Good UTC time handling.The implementation properly handles UTC time conversion for the UBO vendor test configuration.
services/workflows-service/src/webhooks/incoming/incoming-webhooks.controller.ts (1)
1-8
: LGTM! Clean refactoring of webhook handling.The changes improve code organization by:
- Separating incoming webhook handling into its own controller
- Updating import paths to reflect the new structure
- Maintaining consistent functionality while improving modularity
services/workflows-service/package.json (2)
8-8
: LGTM! Setup script updated to manage Redis service.The setup script correctly orchestrates the database and Redis services.
🧰 Tools
🪛 GitHub Actions: CI
[error] Lock file (pnpm-lock.yaml) is out of sync with package.json dependencies. The package specifications in the lockfile do not match the specifications in package.json. Run 'pnpm install' without the frozen-lockfile option to update the lock file.
35-36
: LGTM! Redis Docker management scripts added.The scripts provide clear commands for managing the Redis service lifecycle.
🧰 Tools
🪛 GitHub Actions: CI
[error] Lock file (pnpm-lock.yaml) is out of sync with package.json dependencies. The package specifications in the lockfile do not match the specifications in package.json. Run 'pnpm install' without the frozen-lockfile option to update the lock file.
export const validate = async (config: Record<string, unknown>) => { | ||
const zodEnvSchema = z | ||
.object(serverEnvSchema) | ||
.refine(data => data.HASHING_KEY_SECRET || data.HASHING_KEY_SECRET_BASE64, { | ||
message: 'At least one of HASHING_KEY_SECRET or HASHING_KEY_SECRET_BASE64 should be present', | ||
path: ['HASHING_KEY_SECRET', 'HASHING_KEY_SECRET_BASE64'], | ||
}); | ||
|
||
const result = zodEnvSchema.safeParse(config); | ||
|
||
if (!result.success) { | ||
const errors = result.error.errors.map(zodIssue => ({ | ||
message: `❌ ${zodIssue.message}`, | ||
path: zodIssue.path.join('.'), // Backwards compatibility - Legacy code message excepts array | ||
})); | ||
|
||
throw new Error(JSON.stringify(errors, null, 2)); | ||
} | ||
|
||
// validate salt value | ||
await hashKey('check salt value'); | ||
|
||
return result.data; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure the validate
function in ConfigModule.forRoot
is synchronous
The validate
function provided to ConfigModule.forRoot
should be synchronous. Declaring it as an async
function and using await
inside may cause unexpected behavior since ConfigModule.forRoot
expects validate
to return synchronously. Asynchronous operations inside validate
will not complete as expected.
To fix this issue, consider making the validate
function synchronous and handle any asynchronous validation outside of it. For example, you can perform the salt value validation in a separate initialization step.
async handleJob(job: Job<TJobArgs>) { | ||
this.logger.log(`Processing webhook job ${job.id}`); | ||
|
||
const response = await this.webhookService.invokeWebhook({ | ||
...job.data.jobData, | ||
}); | ||
|
||
this.logger.log(`Webhook job ${job.id} completed with status: ${response.status}`); | ||
|
||
if (response.status >= 200 && response.status < 300) { | ||
return; | ||
} | ||
|
||
await this.handleRetryStrategy(response.status, job); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle exceptions from invokeWebhook
to prevent job crashes
The invokeWebhook
method may throw exceptions, but there is no error handling around its invocation. Without a try-catch
block, any errors will cause the job to fail without proper logging or retry logic.
Apply this diff to handle exceptions appropriately:
async handleJob(job: Job<TJobArgs>) {
this.logger.log(`Processing webhook job ${job.id}`);
+ try {
const response = await this.webhookService.invokeWebhook({
...job.data.jobData,
});
this.logger.log(`Webhook job ${job.id} completed with status: ${response.status}`);
if (response.status >= 200 && response.status < 300) {
return;
}
await this.handleRetryStrategy(response.status, job);
+ } catch (error) {
+ this.logger.error(`Error processing webhook job ${job.id}: ${error.message}`);
+ await this.handleRetryStrategy(500, job);
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async handleJob(job: Job<TJobArgs>) { | |
this.logger.log(`Processing webhook job ${job.id}`); | |
const response = await this.webhookService.invokeWebhook({ | |
...job.data.jobData, | |
}); | |
this.logger.log(`Webhook job ${job.id} completed with status: ${response.status}`); | |
if (response.status >= 200 && response.status < 300) { | |
return; | |
} | |
await this.handleRetryStrategy(response.status, job); | |
} | |
async handleJob(job: Job<TJobArgs>) { | |
this.logger.log(`Processing webhook job ${job.id}`); | |
try { | |
const response = await this.webhookService.invokeWebhook({ | |
...job.data.jobData, | |
}); | |
this.logger.log(`Webhook job ${job.id} completed with status: ${response.status}`); | |
if (response.status >= 200 && response.status < 300) { | |
return; | |
} | |
await this.handleRetryStrategy(response.status, job); | |
} catch (error) { | |
this.logger.error(`Error processing webhook job ${job.id}: ${error.message}`); | |
await this.handleRetryStrategy(500, job); | |
} | |
} |
private async handleRetryStrategy(status: number, job: Job<TJobArgs>) { | ||
if (job.opts.attempts && job.attemptsMade >= job.opts.attempts) { | ||
this.logger.warn(`Job ${job.id} reached the maximum retry attempts (${job.opts.attempts})`); | ||
throw new Error(`Job ${job.id} failed after reaching max attempts`); | ||
} | ||
|
||
let delayMs: number; | ||
|
||
switch (status) { | ||
case HttpStatusCode.TooManyRequests: | ||
case HttpStatusCode.InternalServerError: | ||
case HttpStatusCode.BadGateway: | ||
case HttpStatusCode.ServiceUnavailable: | ||
case HttpStatusCode.GatewayTimeout: | ||
delayMs = Math.pow(2, job.attemptsMade + 1) * 1000; // Exponential backoff | ||
break; | ||
|
||
case HttpStatusCode.RequestTimeout: | ||
delayMs = 1000 * 60 * (job.attemptsMade + 1); // Linear backoff in minutes | ||
break; | ||
|
||
case HttpStatusCode.BadRequest: | ||
throw new Error(`Webhook job failed with status ${status}: Bad Request`); | ||
|
||
default: | ||
throw new Error(`Webhook job failed with status ${status}: Unexpected Error`); | ||
} | ||
|
||
await this.retryJob(job, delayMs); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Utilize BullMQ's built-in retry and backoff features
The custom retry logic in handleRetryStrategy
and retryJob
duplicates functionality that BullMQ provides natively. Leveraging BullMQ's attempts
and backoff
options simplifies the code and reduces complexity.
Consider specifying attempts
and backoff
in the job options or queue configuration and remove the custom retry logic. Apply this diff to simplify the handleJob
method:
if (response.status >= 200 && response.status < 300) {
return;
}
- await this.handleRetryStrategy(response.status, job);
+ throw new Error(`Webhook job failed with status ${response.status}`);
And remove the handleRetryStrategy
and retryJob
methods if they are no longer needed.
Committable suggestion skipped: line range outside the PR's diff.
protected setQueueListener<T extends keyof QueueListener<any, any, any>>({ | ||
queue, | ||
eventName, | ||
listener, | ||
}: { | ||
queue: Queue | undefined; | ||
eventName: T; | ||
listener: QueueListener<any, any, any>[T]; | ||
}) { | ||
return async () => { | ||
queue?.removeAllListeners(eventName); | ||
queue?.on(eventName, listener); | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setQueueListener
does not attach the event listener
The setQueueListener
method returns an async function that is not called, so the event listener is never attached to the queue. This prevents the listener from functioning as intended.
Apply this diff to fix the issue:
protected setQueueListener<T extends keyof QueueListener<any, any, any>>({
queue,
eventName,
listener,
}: {
queue: Queue | undefined;
eventName: T;
listener: QueueListener<any, any, any>[T];
}) {
- return async () => {
queue?.removeAllListeners(eventName);
queue?.on(eventName, listener);
- };
}
By removing the returned function, the listener will be correctly attached when setQueueListener
is called.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
protected setQueueListener<T extends keyof QueueListener<any, any, any>>({ | |
queue, | |
eventName, | |
listener, | |
}: { | |
queue: Queue | undefined; | |
eventName: T; | |
listener: QueueListener<any, any, any>[T]; | |
}) { | |
return async () => { | |
queue?.removeAllListeners(eventName); | |
queue?.on(eventName, listener); | |
}; | |
} | |
protected setQueueListener<T extends keyof QueueListener<any, any, any>>({ | |
queue, | |
eventName, | |
listener, | |
}: { | |
queue: Queue | undefined; | |
eventName: T; | |
listener: QueueListener<any, any, any>[T]; | |
}) { | |
queue?.removeAllListeners(eventName); | |
queue?.on(eventName, listener); | |
} |
export const REDIS_CONFIG = { | ||
host: env.REDIS_HOST || 'localhost', | ||
port: env.REDIS_PORT || 7381, | ||
...(env.REDIS_PASSWORD ? { password: env.REDIS_PASSWORD } : {}), | ||
...(env.REDIS_DB ? { db: env.REDIS_DB } : {}), | ||
} satisfies RedisOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance Redis configuration with security and reliability settings.
While the basic configuration is good, consider adding these essential settings:
- The default port (7381) is non-standard - consider using the standard Redis port 6379
- Add connection timeout settings
- Consider TLS configuration for secure environments
- Add retry strategy for connection resilience
export const REDIS_CONFIG = {
host: env.REDIS_HOST || 'localhost',
- port: env.REDIS_PORT || 7381,
+ port: env.REDIS_PORT || 6379,
...(env.REDIS_PASSWORD ? { password: env.REDIS_PASSWORD } : {}),
...(env.REDIS_DB ? { db: env.REDIS_DB } : {}),
+ tls: env.REDIS_TLS_ENABLED ? {
+ rejectUnauthorized: true,
+ ca: env.REDIS_TLS_CA,
+ } : undefined,
+ connectTimeout: 10000,
+ maxRetriesPerRequest: 3,
+ retryStrategy: (times: number) => Math.min(times * 50, 2000),
} satisfies RedisOptions;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
export const REDIS_CONFIG = { | |
host: env.REDIS_HOST || 'localhost', | |
port: env.REDIS_PORT || 7381, | |
...(env.REDIS_PASSWORD ? { password: env.REDIS_PASSWORD } : {}), | |
...(env.REDIS_DB ? { db: env.REDIS_DB } : {}), | |
} satisfies RedisOptions; | |
export const REDIS_CONFIG = { | |
host: env.REDIS_HOST || 'localhost', | |
port: env.REDIS_PORT || 6379, | |
...(env.REDIS_PASSWORD ? { password: env.REDIS_PASSWORD } : {}), | |
...(env.REDIS_DB ? { db: env.REDIS_DB } : {}), | |
tls: env.REDIS_TLS_ENABLED ? { | |
rejectUnauthorized: true, | |
ca: env.REDIS_TLS_CA, | |
} : undefined, | |
connectTimeout: 10000, | |
maxRetriesPerRequest: 3, | |
retryStrategy: (times: number) => Math.min(times * 50, 2000), | |
} satisfies RedisOptions; |
@@ -205,6 +205,7 @@ export const CollectionFlow = withSessionProtected(() => { | |||
<div className="flex w-full justify-end"> | |||
<AppShell.LanguagePicker /> | |||
</div> | |||
<AppShell.Navigation /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove duplicate navigation component.
The AppShell.Navigation
component is duplicated, which could lead to UI inconsistencies and confusion.
Apply this diff to remove the duplicate:
- <AppShell.Navigation />
[COMPANY_SCREENING_VENDORS['test']]: (options: CompanySanctionsAsiaVerifyOptions) => ({ | ||
name: 'companySanctions', | ||
pluginKind: 'company-sanctions', | ||
vendor: 'test', | ||
url: { | ||
url: `{secret.UNIFIED_API_URL}/companies/{country}/{entity.data.companyName}/sanctions`, | ||
options: { | ||
country: options.defaultCountry ?? '{entity.data.country}', | ||
}, | ||
}, | ||
headers: { Authorization: 'Bearer {secret.UNIFIED_API_TOKEN}' }, | ||
method: 'GET' as const, | ||
displayName: 'Company Sanctions', | ||
persistResponseDestination: 'pluginsOutput.companySanctions', | ||
request: { | ||
transform: [ | ||
{ | ||
mapping: "{ vendor: 'test' }", | ||
transformer: 'jmespath', | ||
}, | ||
], | ||
}, | ||
response: { | ||
transform: [ | ||
{ | ||
mapping: | ||
"merge({ name: 'companySanctions', status: contains(['NOT_IMPLEMENTED', 'NOT_AVAILABLE'], reason) && 'CANCELED' || error != `null` && 'ERROR' || 'SUCCESS' }, @)", | ||
transformer: 'jmespath', | ||
}, | ||
{ | ||
mapping: [ | ||
{ | ||
method: 'setTimeToRecordUTC', | ||
source: 'invokedAt', | ||
target: 'invokedAt', | ||
}, | ||
], | ||
transformer: 'helper', | ||
}, | ||
], | ||
}, | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove duplicate vendor implementation.
The implementation of COMPANY_SCREENING_VENDORS['test']
is duplicated, which could lead to confusion and maintenance issues.
Remove the duplicate implementation (lines 510-551) as it's already defined above.
const collectionFlow = buildCollectionFlowState({ | ||
apiUrl: env.APP_API_URL, | ||
steps: uiDefinition?.definition | ||
? getOrderedSteps( | ||
(uiDefinition?.definition as Prisma.JsonObject)?.definition as Record< | ||
string, | ||
Record<string, unknown> | ||
>, | ||
{ finalStates: [...WORKFLOW_FINAL_STATES] }, | ||
).map(stepName => ({ | ||
stateName: stepName, | ||
})) | ||
: [], | ||
additionalInformation: { | ||
customerCompany: customer.displayName, | ||
}, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove duplicate variable declaration.
The collectionFlow
variable is declared twice, which is a syntax error and will cause runtime issues.
Remove the duplicate declaration (lines 1599-1615) as it's already defined above.
🧰 Tools
🪛 Biome (1.9.4)
[error] 1599-1599: Shouldn't redeclare 'collectionFlow'. Consider to delete it or rename it.
'collectionFlow' is defined here:
(lint/suspicious/noRedeclare)
import { | ||
MERCHANT_REPORT_STATUSES_MAP, | ||
MERCHANT_REPORT_TYPES_MAP, | ||
} from '@/domains/business-reports/constants'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove duplicate imports.
The constants MERCHANT_REPORT_STATUSES_MAP
and MERCHANT_REPORT_TYPES_MAP
are already imported at lines 17-20.
Apply this diff to remove the duplicate imports:
-import {
- MERCHANT_REPORT_STATUSES_MAP,
- MERCHANT_REPORT_TYPES_MAP,
-} from '@/domains/business-reports/constants';
🧰 Tools
🪛 Biome (1.9.4)
[error] 32-32: Shouldn't redeclare 'MERCHANT_REPORT_STATUSES_MAP'. Consider to delete it or rename it.
'MERCHANT_REPORT_STATUSES_MAP' is defined here:
(lint/suspicious/noRedeclare)
[error] 33-33: Shouldn't redeclare 'MERCHANT_REPORT_TYPES_MAP'. Consider to delete it or rename it.
'MERCHANT_REPORT_TYPES_MAP' is defined here:
(lint/suspicious/noRedeclare)
@@ -409,6 +409,7 @@ export const useDocumentBlocks = ({ | |||
} | |||
|
|||
if (ocrResult?.parsedData?.[title]) { | |||
debugger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debugger statement.
Debugger statements should not be committed to production code.
Apply this diff to remove the debugger statement:
- debugger;
🧰 Tools
🪛 Biome (1.9.4)
[error] 412-412: This is an unexpected use of the debugger statement.
Unsafe fix: Remove debugger statement
(lint/suspicious/noDebugger)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (4)
services/workflows-service/src/bull-mq/bull-mq.module.ts (2)
12-28
: Consider aligning with arrow function consistency.
You’ve explicitly disabled theprefer-arrow/prefer-arrow-functions
rule forcomposeQueueAndDlqBoard
. If the rest of the codebase consistently uses arrow functions, consider removing the ESLint disable directive and converting this to an arrow function for consistency.
30-39
: Clarify the queue configuration logic.
composeInitiateQueueWithDlq
cleverly handles the creation of both the main queue and the DLQ in a concise way. However, the chained logical checks can be slightly opaque for newcomers. Adding a descriptive comment near thefilter(Boolean)
step would aid readability and maintainability.services/workflows-service/src/bull-mq/consts.ts (1)
3-37
: Centralize and document backoff and retry strategies.
TheDEFAULT
,INCOMING_WEBHOOKS_QUEUE
, andOUTGOING_WEBHOOKS_QUEUE
each define attempts and backoff. Consider placing these shared retry strategies in a separate config or constants file for consistency across all queues and future expansions.services/workflows-service/package.json (1)
8-8
: Consider improving script organization.The Redis management scripts could be better organized:
- Consider combining the Redis setup commands into a single script.
- Consider moving Redis management scripts to a separate namespace (e.g.,
redis:up
,redis:down
).- "setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed", - "docker:redis": "docker compose -f docker-compose.redis.yml up -d --wait", - "docker:redis:down": "docker compose -f docker-compose.redis.yml down --volumes", + "setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run redis:setup && npm run db:reset && npm run seed", + "redis:setup": "npm run redis:down && npm run redis:up", + "redis:up": "docker compose -f docker-compose.redis.yml up -d --wait", + "redis:down": "docker compose -f docker-compose.redis.yml down --volumes",Also applies to: 35-36
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (10)
services/workflows-service/package.json
(4 hunks)services/workflows-service/prisma/data-migrations
(1 hunks)services/workflows-service/src/alert/alert.module.ts
(3 hunks)services/workflows-service/src/app.module.ts
(3 hunks)services/workflows-service/src/bull-mq/base/base-queue-worker.service.ts
(1 hunks)services/workflows-service/src/bull-mq/bull-mq.module.ts
(1 hunks)services/workflows-service/src/bull-mq/consts.ts
(1 hunks)services/workflows-service/src/bull-mq/outgoing-webhook/outgoing-webhook-queue.service.ts
(1 hunks)services/workflows-service/src/events/workflow-completed-webhook-caller.ts
(4 hunks)services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- services/workflows-service/prisma/data-migrations
- services/workflows-service/src/bull-mq/outgoing-webhook/outgoing-webhook-queue.service.ts
- services/workflows-service/src/bull-mq/base/base-queue-worker.service.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: test_windows
- GitHub Check: test_linux
- GitHub Check: build (windows-latest)
- GitHub Check: spell_check
- GitHub Check: build (ubuntu-latest)
- GitHub Check: lint
- GitHub Check: Analyze (javascript)
🔇 Additional comments (5)
services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts (1)
10-24
: Avoid missing or undefined HTTP methods.
TheinvokeWebhook
method relies onrequestConfig.method
without enforcing a default or validating it. Consider adding a sanity check or default method to handle scenarios wheremethod
might not be supplied or is incorrectly spelled.services/workflows-service/src/bull-mq/consts.ts (1)
8-13
: Confirm that 15 retry attempts is optimal.
For theDEFAULT
queue, 15 attempts with exponential backoff can cause extended delays before the job finally fails or lands in the DLQ. Ensure that this is aligned with operational SLAs, as it may significantly postpone error handling for failing jobs.services/workflows-service/src/alert/alert.module.ts (1)
37-38
: Remove duplicate module entries.BullMqModule and OutgoingWebhooksModule are listed in both
imports
andproviders
arrays. They should only be inimports
.services/workflows-service/src/app.module.ts (1)
51-53
: LGTM!The new modules are correctly imported and added to the imports array.
Also applies to: 96-97, 139-139
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (1)
150-152
: Add QUEUE_SYSTEM_ENABLED to environment configuration template.The QUEUE_SYSTEM_ENABLED environment variable is missing from the .env.example file.
BullModule.forRootAsync({ | ||
useFactory: () => { | ||
return { | ||
connection: { | ||
...REDIS_CONFIG, | ||
}, | ||
}; | ||
}, | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add fallback or retry on Redis connection issues.
Currently, BullModule.forRootAsync
uses only the basic REDIS_CONFIG
. If Redis is unavailable at startup or faces transient failures, the module may fail to initialize. To improve resilience, consider adding connection error handling or a retry strategy.
BullBoardModule.forRoot({ | ||
route: '/queues', | ||
adapter: ExpressAdapter, | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Secure access to Bull Board.
Exposing the Bull Board at /queues
can create security or privacy concerns in production. Consider restricting access via authentication, IP whitelisting, or at least an environment-based guard to prevent unauthorized queue monitoring.
const signedHeaders = { | ||
Accept: 'application/json', | ||
'Content-Type': 'application/json', | ||
'Access-Control-Allow-Origin': '*', | ||
...headers, | ||
...(body && customerConfig?.webhookSharedSecret | ||
? { | ||
'X-HMAC-Signature': sign({ | ||
payload: body, | ||
key: customerConfig.webhookSharedSecret, | ||
}), | ||
} | ||
: {}), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure consistent payload formats for signature generation.
When computing the HMAC signature, if body
is sometimes a string and sometimes an object, it’s crucial to ensure consistent serialization. Otherwise, the signature could become invalid. For clarity, make sure you serialize or transform body
into a consistent format prior to signing.
return await axios({ | ||
url, | ||
method, | ||
headers: signedHeaders, | ||
data: body, | ||
timeout: timeout || 15000, | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle non-2xx responses and exceptions gracefully.
Currently, await axios(...)
will throw on 4xx/5xx responses, but there is no retry, logging, or fallback mechanism. Un-commenting your existing error-handling block or implementing custom logic for non-2xx responses can improve observability and resilience.
// Omit from data properties already sent as part of the webhook payload | ||
const { runtimeData, correlationId, entityId, ...restData } = data; | ||
const { | ||
createdAt, | ||
resolvedAt, | ||
workflowDefinitionId, | ||
id: runtimeDataId, | ||
...restRuntimeData | ||
} = runtimeData; | ||
const payload = { | ||
id, | ||
eventName: 'workflow.completed', | ||
apiVersion, | ||
timestamp: new Date().toISOString(), | ||
workflowCreatedAt: createdAt, | ||
workflowResolvedAt: resolvedAt, | ||
workflowDefinitionId, | ||
workflowRuntimeId: runtimeDataId, | ||
workflowStatus: data.runtimeData.status, | ||
workflowFinalState: data.runtimeData.state, | ||
ballerineEntityId: entityId, | ||
correlationId, | ||
environment, | ||
data: { | ||
...restRuntimeData.context, | ||
}, | ||
}; | ||
|
||
const webhookArgs = { | ||
requestConfig: { | ||
url, | ||
method: 'POST', | ||
headers: {}, | ||
body: payload, | ||
timeout: 15_000, | ||
}, | ||
customerConfig: { | ||
webhookSharedSecret, | ||
}, | ||
} as const; | ||
|
||
if (env.QUEUE_SYSTEM_ENABLED) { | ||
return await this.outgoingWebhookQueueService.addJob(webhookArgs); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor duplicate code blocks.
The code block for queue system enabled check and payload construction is duplicated. Consider refactoring to eliminate the duplication:
- Move the payload construction to a separate method.
- Remove the duplicate queue system enabled check.
private async sendWebhook({
data,
webhook: { id, url, environment, apiVersion },
webhookSharedSecret,
}) {
this.logger.log('Sending webhook', { id, url });
- // Omit from data properties already sent as part of the webhook payload
- const { runtimeData, correlationId, entityId, ...restData } = data;
- const {
- createdAt,
- resolvedAt,
- workflowDefinitionId,
- id: runtimeDataId,
- ...restRuntimeData
- } = runtimeData;
- const payload = {
- id,
- eventName: 'workflow.completed',
- apiVersion,
- timestamp: new Date().toISOString(),
- workflowCreatedAt: createdAt,
- workflowResolvedAt: resolvedAt,
- workflowDefinitionId,
- workflowRuntimeId: runtimeDataId,
- workflowStatus: data.runtimeData.status,
- workflowFinalState: data.runtimeData.state,
- ballerineEntityId: entityId,
- correlationId,
- environment,
- data: {
- ...restRuntimeData.context,
- },
- };
+ const payload = this.buildWebhookPayload(data, id, apiVersion, environment);
const webhookArgs = {
requestConfig: {
url,
method: 'POST',
headers: {},
body: payload,
timeout: 15_000,
},
customerConfig: {
webhookSharedSecret,
},
} as const;
if (env.QUEUE_SYSTEM_ENABLED) {
return await this.outgoingWebhookQueueService.addJob(webhookArgs);
}
try {
- // Omit from data properties already sent as part of the webhook payload
- const { runtimeData, correlationId, entityId, childWorkflowsRuntimeData, ...restData } = data;
- const {
- createdAt,
- resolvedAt,
- workflowDefinitionId,
- id: runtimeDataId,
- ...restRuntimeData
- } = runtimeData;
- const payload = {
- id,
- eventName: 'workflow.completed',
- apiVersion,
- timestamp: new Date().toISOString(),
- workflowCreatedAt: createdAt,
- workflowResolvedAt: resolvedAt,
- workflowDefinitionId,
- workflowRuntimeId: runtimeDataId,
- workflowStatus: data.runtimeData.status,
- workflowFinalState: data.runtimeData.state,
- ballerineEntityId: entityId,
- correlationId,
- environment,
- data: {
- ...restRuntimeData.context,
- childWorkflowsRuntimeData,
- },
- };
- if (env.QUEUE_SYSTEM_ENABLED) {
- return await this.outgoingWebhookQueueService.addJob({
- requestConfig: {
- url,
- method: 'POST',
- headers: {},
- body: payload,
- timeout: 15_000,
- },
- customerConfig: {
- webhookSharedSecret,
- },
- });
- }
const res = await this.#__axios.post(url, payload, {
headers: {
'X-Authorization': webhookSharedSecret,
'X-HMAC-Signature': sign({ payload, key: webhookSharedSecret }),
},
});
Add a new private method to handle payload construction:
private buildWebhookPayload(
data: ExtractWorkflowEventData<'workflow.completed'>,
id: string,
apiVersion: string,
environment: string,
) {
const { runtimeData, correlationId, entityId, childWorkflowsRuntimeData } = data;
const {
createdAt,
resolvedAt,
workflowDefinitionId,
id: runtimeDataId,
...restRuntimeData
} = runtimeData;
return {
id,
eventName: 'workflow.completed',
apiVersion,
timestamp: new Date().toISOString(),
workflowCreatedAt: createdAt,
workflowResolvedAt: resolvedAt,
workflowDefinitionId,
workflowRuntimeId: runtimeDataId,
workflowStatus: data.runtimeData.status,
workflowFinalState: data.runtimeData.state,
ballerineEntityId: entityId,
correlationId,
environment,
data: {
...restRuntimeData.context,
...(childWorkflowsRuntimeData ? { childWorkflowsRuntimeData } : {}),
},
};
}
Also applies to: 184-197
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts (1)
Line range hint
19-19
: Remove unused axios instance and service.The
#__axios
private field andhttpService
are no longer used after the refactor to useOutgoingWebhooksService
.- #__axios: AxiosInstance; - constructor( - private httpService: HttpService, workflowEventEmitter: WorkflowEventEmitterService, // ... ) { - this.#__axios = this.httpService.axiosRef; // ... }Also applies to: 30-30
🧹 Nitpick comments (10)
services/workflows-service/src/bull-mq/bull-mq.module.ts (3)
9-9
: Naming consistency for QUEUES.While
QUEUES
is descriptive, consider renaming it to something likeBULL_QUEUES
orQUEUE_CONFIGS
to be more explicit about its purpose in the context of BullMQ.
36-49
: Handle potential queue registration failures.While registering multiple queues via a flat map is convenient, consider adding error handling or logging if any queue registration fails. This ensures better debugging and operational clarity in case of misconfiguration or unexpected system states.
52-52
: Export only necessary modules.Currently,
BullModule
is exported. If no other module depends on it directly, you can keep it internal or export only the necessary services (e.g.,OutgoingWebhookQueueService
). Restricting exports helps maintain a cleaner API surface.services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts (4)
17-24
: Log queue skipping reason if QUEUE_SYSTEM_ENABLED is false.When
env.QUEUE_SYSTEM_ENABLED
is disabled, the constructor returns early. For improved clarity, log that the queue has been skipped and won't be initialized. This helps with debugging environment-based configurations.if (!env.QUEUE_SYSTEM_ENABLED) { + this.logger.warn(`Queue ${queueName} not initialized because QUEUE_SYSTEM_ENABLED is false`); return; }
26-33
: Validate the presence of queue configuration.Throwing an error when the queue configuration is not found is valid. However, consider adding more context or suggestions on how to fix the configuration to reduce confusion for the developer or operator.
64-69
: Use a more generic log message for base worker.The message references "Webhook job," which might not always be accurate if the service is extended for non-webhook jobs. Consider including the
this.queueName
in the log message or using a more general message to avoid confusion.- this.logger.log(`Webhook job ${job.id} is active`); + this.logger.log(`Queue ${this.queueName} - job ${job.id} is active`);
75-96
: Consider capturing stack traces or extended diagnostic info on job failures.When a job fails, you're logging job IDs and attempts. For deeper diagnostics, capture or link to logs containing the full error stack, particularly when the job is permanently failing. This improves troubleshooting.
services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts (1)
8-13
: Add validation for IncomingWebhookData interface.The interface lacks validation for required fields and type constraints.
Consider using class-validator decorators:
import { IsString, IsObject, IsFunction } from 'class-validator'; class IncomingWebhookData { @IsString() source: string; @IsObject() payload: Record<string, unknown>; @IsFunction() service: (payload: Record<string, unknown>) => Promise<void>; }services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts (1)
56-56
: Extract timeout value to a constant.The timeout value of 15000ms is duplicated. Consider extracting it to a constant for better maintainability.
+const WEBHOOK_TIMEOUT_MS = 15_000; + export class WebhookManagerService { // ... private async sendWebhook<T>({ // ... - timeout: 15_000, + timeout: WEBHOOK_TIMEOUT_MS, // ... }) { // ... - timeout: 15_000, + timeout: WEBHOOK_TIMEOUT_MS, // ... } }Also applies to: 69-69
services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts (1)
98-105
: Extract webhook configuration to a reusable constant.The webhook configuration object is defined inline and could be reused across different webhook calls.
+const createWebhookConfig = (url: string, payload: unknown, secret: string) => ({ + url, + method: 'POST' as const, + headers: {}, + body: payload, + timeout: 15_000, + secret, +}); + - const webhookArgs = { - url, - method: 'POST', - headers: {}, - body: payload, - timeout: 15_000, - secret: webhookSharedSecret, - } as const; + const webhookArgs = createWebhookConfig(url, payload, webhookSharedSecret);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts
(2 hunks)services/workflows-service/src/bull-mq/bull-mq.module.ts
(1 hunks)services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts
(1 hunks)services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts
(1 hunks)services/workflows-service/src/bull-mq/queues/outgoing-webhook-queue.service.ts
(1 hunks)services/workflows-service/src/events/document-changed-webhook-caller.ts
(3 hunks)services/workflows-service/src/events/workflow-completed-webhook-caller.ts
(4 hunks)services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts
(3 hunks)services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: test_windows
- GitHub Check: test_linux
- GitHub Check: build (windows-latest)
- GitHub Check: build (ubuntu-latest)
- GitHub Check: Analyze (javascript)
- GitHub Check: lint
🔇 Additional comments (11)
services/workflows-service/src/bull-mq/bull-mq.module.ts (2)
19-27
: Add retry or fallback logic to ensure resilient connections to Redis.This was highlighted in a previous review comment. If Redis is unavailable at startup, the module may fail to initialize. Consider adding retry strategies or graceful fallbacks to ensure the service remains robust.
31-34
: Secure access to the Bull Board route in production.As noted before, exposing queue monitoring at
/queues
without authentication or IP restriction may lead to security issues. Evaluate environment-based access control or authentication to protect this endpoint in production.services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts (1)
136-141
: Close the worker first, then pause the queue.Pausing the queue while the worker is still processing a job may lead to unexpected states. Consider reversing the order: close the worker to ensure no new jobs are processed, then pause the queue to queue up subsequent incoming jobs until fully resumed.
Do you want me to generate a script to confirm that reversing the order doesn't break Worker internals or tests?
services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts (2)
39-46
: Previous review comments about error handling are still applicable.The error handling concerns from previous reviews remain unaddressed:
- No specific handling for network timeouts
- Missing request details in error logs
- No retry logic for transient failures
35-37
: Ensure consistent payload format for HMAC signing.When computing the HMAC signature, the body could be either a string or an object. This inconsistency could lead to invalid signatures.
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (3)
16-17
: LGTM!The new imports for environment configuration and queue service are properly scoped and align with the BullMQ integration objectives.
31-31
: LGTM!The constructor properly injects the queue service following dependency injection pattern and TypeScript best practices.
Line range hint
109-189
: Critical: Fix code duplication in webhook handling.The code contains duplicate blocks for:
- Payload construction
- Queue system enabled check
- Webhook arguments construction
Additionally, there's an inconsistency in handling
childWorkflowsRuntimeData
between the two blocks.This issue was previously identified. Please refer to the earlier review comment that provided a solution for:
- Moving payload construction to a separate method
- Removing the duplicate queue system check
- Consolidating the webhook argument construction
services/workflows-service/src/events/document-changed-webhook-caller.ts (3)
16-18
: LGTM!The imports correctly reflect the architectural changes:
- Addition of queue service and outgoing webhooks service
- Removal of direct signing in favor of the service-based approach
40-41
: LGTM!The constructor properly injects both services following dependency injection pattern and TypeScript best practices.
182-220
: LGTM!The webhook handling implementation demonstrates:
- Clean payload construction
- Proper queue system integration
- Good separation of concerns using the outgoing webhook service
- Comprehensive error handling
async handleJob(job: Job<TJobArgs>) { | ||
await this.outgoingWebhookService.invokeWebhook(job.data.jobData); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle exceptions from invokeWebhook
calls gracefully.
If an external webhook fails, consider adding a retry, detailed logging, or additional fallback logic. Relying exclusively on max job attempts can obscure granular webhook errors that may be helpful for debugging or alerting.
super(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name, logger); | ||
this.initializeWorker(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Remove redundant initializeWorker
call
The base class constructor in base-queue-worker.service.ts
already calls this.initializeWorker()
. Calling it again in the subclass creates duplicate worker instances and event listeners. Remove this redundant call.
🔗 Analysis chain
Avoid calling initializeWorker
multiple times.
The base class constructor already calls this.initializeWorker()
. Re-invoking it in the subclass constructor can lead to redundant or conflicting event listeners. Confirm if this call is truly necessary.
constructor(
protected readonly logger: AppLoggerService,
protected outgoingWebhookService: OutgoingWebhooksService,
) {
super(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name, logger);
- this.initializeWorker();
}
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Find the base class implementation and constructor
ast-grep --pattern 'class $_ {
constructor($_) {
$$$
}
$$$
}'
# Also search for any direct references to initializeWorker
rg "initializeWorker" -A 5
Length of output: 65840
Script:
#!/bin/bash
# Find the base class and initializeWorker implementation in bull-mq directory
cd services/workflows-service/src/bull-mq && rg -l "class.*extends" .
rg "initializeWorker" . -B 5 -A 5
Length of output: 2415
async handleJob(job: Job<TJobsWebhookIncoming>) { | ||
this.logger.log(`Processing webhook job ${job.id}`); | ||
|
||
const { service: workingService, payload } = job.data.jobData; | ||
// TODO - handle the invoking webhook job internally | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement error handling and complete the TODO.
The handleJob
method needs implementation and proper error handling:
- The TODO comment indicates incomplete implementation
- No try-catch block for error handling
- No validation of job data
- No logging of job completion status
Consider implementing like this:
async handleJob(job: Job<TJobsWebhookIncoming>) {
this.logger.log(`Processing webhook job ${job.id}`);
+ try {
+ const { service: workingService, payload } = job.data.jobData;
- const { service: workingService, payload } = job.data.jobData;
- // TODO - handle the invoking webhook job internally
+ // Validate job data
+ if (!workingService || !payload) {
+ throw new Error('Invalid job data: missing required fields');
+ }
+
+ // Process the webhook
+ await workingService(payload);
+
+ this.logger.log(`Successfully processed webhook job ${job.id}`);
+ } catch (error) {
+ this.logger.error(`Failed to process webhook job ${job.id}`, error);
+ throw error; // Re-throw to mark job as failed
+ }
}
Committable suggestion skipped: line range outside the PR's diff.
const headers: RawAxiosRequestHeaders = { | ||
Accept: 'application/json', | ||
'Content-Type': 'application/json', | ||
'Access-Control-Allow-Origin': '*', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unnecessary CORS header.
The Access-Control-Allow-Origin: '*'
header should not be set on outgoing requests. CORS headers are response headers used by servers to control cross-origin access.
headers: RawAxiosRequestHeaders = {
Accept: 'application/json',
'Content-Type': 'application/json',
- 'Access-Control-Allow-Origin': '*',
...argsHeaders,
};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
'Access-Control-Allow-Origin': '*', | |
headers: RawAxiosRequestHeaders = { | |
Accept: 'application/json', | |
'Content-Type': 'application/json', | |
...argsHeaders, | |
}; |
url, | ||
method: 'POST', | ||
headers: {}, | ||
body: data as unknown as AnyRecord, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid type assertion with unknown.
The type assertion data as unknown as AnyRecord
is a code smell. Consider proper type validation or transformation.
-body: data as unknown as AnyRecord,
+body: validateAndTransformData(data),
+function validateAndTransformData<T>(data: T): AnyRecord {
+ // Add validation logic here
+ return data as AnyRecord;
+}
Also applies to: 68-68
const webhookArgs = { | ||
url, | ||
method: 'POST', | ||
headers: {}, | ||
body: payload, | ||
timeout: 15_000, | ||
secret: webhookSharedSecret, | ||
} as const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Standardize webhook arguments structure.
The webhook arguments structure varies between the two implementations. Standardize it to use a single, consistent format throughout the code.
- const webhookArgs = {
- url,
- method: 'POST',
- headers: {},
- body: payload,
- timeout: 15_000,
- secret: webhookSharedSecret,
- } as const;
+ const webhookArgs = {
+ requestConfig: {
+ url,
+ method: 'POST',
+ headers: {},
+ body: payload,
+ timeout: 15_000,
+ },
+ customerConfig: {
+ webhookSharedSecret,
+ },
+ } as const;
Also applies to: 181-188
… functionality vs unqueued
* feat: added dlq logic for each queue * updated queue worker listener logic * feat: added pausing and resuming the workers on destroy * feat: set queue worker logic functionaloity * feat: finalized functionality of base queue and optional dlq * feat: added webhook handling logic
bba7beb
to
1a7acc9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (1)
109-135
: 🛠️ Refactor suggestionRefactor duplicate payload construction.
The payload construction logic is duplicated. Consider extracting it into a separate method as suggested in the previous review.
🧹 Nitpick comments (10)
services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts (2)
56-56
: Extract timeout value to configuration.The timeout value of 15,000ms is hardcoded. Consider moving this to a configuration value for better maintainability.
+// In your configuration file or environment variables +WEBHOOK_TIMEOUT_MS=15000 -timeout: 15_000, +timeout: env.WEBHOOK_TIMEOUT_MS,
76-76
: Improve error type declaration.The error catch clause uses a union type of
Error | any
which effectively just becomesany
. Consider using a more specific error type or removing the union withany
.-} catch (error: Error | any) { +} catch (error: Error) {🧰 Tools
🪛 Biome (1.9.4)
[error] 76-76: Catch clause variable type annotation must be 'any' or 'unknown' if specified.
(parse)
services/workflows-service/src/webhooks/incoming/incoming-webhooks.module.ts (1)
Line range hint
42-55
: Consider reducing module coupling.The module has numerous dependencies which could make it harder to maintain and test. Consider:
- Breaking down the module into smaller, more focused sub-modules
- Resolving the circular dependency with
AuthModule
using an interface or event-based approachservices/workflows-service/src/bull-mq/bull-mq.module.ts (1)
38-51
: Conditionally register queues based on environment
If you anticipate that certain queues won't be used or want to limit queue overhead in certain deployments, consider a conditional registration approach using an environment toggle (e.g.,QUEUE_SYSTEM_ENABLED
). This can reduce resource usage and potential misconfigurations....Object.values(QUEUES).flatMap(queue => { + if (!configService.get<boolean>('QUEUE_SYSTEM_ENABLED')) { + return []; + } const queues: Array<Omit<RegisterQueueOptions, 'name'> & { name: string }> = [ { name: queue.name, ...queue.config }, ];services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts (1)
24-25
: Refactor to address circular dependency
The comment hints at a potential circular dependency involvingIncomingWebhooksService
. Consider restructuring your modules or placing shared interfaces in a separate module to avoid cyclical references.services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts (4)
29-31
: Enhance job addition with retry options and error handling.The
addJob
method should include default retry options and handle potential errors.Consider this implementation:
-async addJob(jobData: T, metadata: TJobPayloadMetadata = {}, jobOptions = {}): Promise<void> { - await this.queue?.add(randomUUID(), { metadata, jobData }, jobOptions); +async addJob(jobData: T, metadata: TJobPayloadMetadata = {}, jobOptions = {}): Promise<void> { + try { + const defaultOptions = { + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + }, + }; + + await this.queue?.add( + randomUUID(), + { metadata, jobData }, + { ...defaultOptions, ...jobOptions }, + ); + } catch (error) { + this.logger.error(`Failed to add job to queue ${this.queue.name}:`, error); + throw error; + } +}
33-44
: Add worker concurrency and rate limiting options.The worker initialization could benefit from additional configuration options to control job processing.
Consider adding these options:
protected initializeWorker() { this.worker = new Worker(this.queue.name, this.handleJob.bind(this), { connection: { host: this.configService.get('REDIS_HOST'), port: this.configService.get('REDIS_PORT'), password: this.configService.get('REDIS_PASSWORD'), }, + concurrency: this.configService.get<number>('QUEUE_CONCURRENCY', 1), + limiter: { + max: this.configService.get<number>('QUEUE_RATE_LIMIT_MAX', 100), + duration: this.configService.get<number>('QUEUE_RATE_LIMIT_DURATION', 1000), + }, + skipDelayCheck: true, }); this.addWorkerListeners(); this.addQueueListeners(); }
58-63
: Enhance failed job handling and DLQ processing.The failed job handling could be more robust with additional context and data transformation.
Consider this implementation:
-listener: async job => { - if (!job?.opts.attempts || job.attemptsMade < job.opts.attempts) return; - - this.logger.error(`Job ${job?.id} failed permanently. Moving to DLQ.`); - await this.deadLetterQueue.add(randomUUID(), job?.data); -}, +listener: async (job, err) => { + if (!job?.opts.attempts || job.attemptsMade < job.opts.attempts) { + this.logger.warn(`Job ${job?.id} failed attempt ${job.attemptsMade}/${job.opts.attempts}:`, err); + return; + } + + this.logger.error(`Job ${job?.id} failed permanently after ${job.attemptsMade} attempts:`, err); + + const dlqData = { + originalJob: { + id: job.id, + data: job.data, + opts: job.opts, + }, + error: { + message: err.message, + stack: err.stack, + }, + failedAt: new Date().toISOString(), + }; + + await this.deadLetterQueue.add(randomUUID(), dlqData, { + attempts: 1, + removeOnComplete: true, + }); +},
103-124
: Improve lifecycle hooks with better error handling and cleanup.The module lifecycle hooks could be more robust with proper error handling and cleanup.
Consider this implementation:
async onModuleDestroy() { + try { await this.queue?.pause(); - await Promise.all([this.worker?.close(), this.queue?.close()]); + await Promise.all([ + this.worker?.close(), + this.queue?.close(), + this.deadLetterQueue?.close(), + ]); this.logger.log(`Queue ${this.queue.name} is paused and closed`); + } catch (error) { + this.logger.error(`Failed to cleanup queue ${this.queue.name}:`, error); + } } async onModuleInit() { - if (this.queue) { + try { + if (!this.queue) return; + const isPaused = await this.queue.isPaused(); if (isPaused) { await this.queue.resume(); } const isPausedAfterResume = await this.queue?.isPaused(); if (isPausedAfterResume) { this.logger.error(`Queue ${this.queue.name} is still paused after trying to resume it`); + throw new Error(`Failed to resume queue ${this.queue.name}`); } + } catch (error) { + this.logger.error(`Failed to initialize queue ${this.queue.name}:`, error); + throw error; } }services/workflows-service/package.json (1)
8-8
: Add Redis readiness check in setup script.The setup script should wait for Redis to be ready before proceeding with subsequent commands. Consider adding a wait-on check for Redis similar to the DB check.
- "setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && npm run db:reset && npm run seed", + "setup": "npm run docker:db:down && npm run docker:db && wait-on tcp:5432 && npm run docker:redis:down && npm run docker:redis && wait-on tcp:${REDIS_PORT:-6379} && npm run db:reset && npm run seed",
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (24)
services/workflows-service/docker-compose.redis.yml
(1 hunks)services/workflows-service/package.json
(4 hunks)services/workflows-service/prisma/data-migrations
(1 hunks)services/workflows-service/src/alert/alert.module.ts
(2 hunks)services/workflows-service/src/alert/webhook-manager/webhook-http.service.ts
(0 hunks)services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts
(2 hunks)services/workflows-service/src/app.module.ts
(3 hunks)services/workflows-service/src/bull-mq/bull-mq.module.ts
(1 hunks)services/workflows-service/src/bull-mq/consts.ts
(1 hunks)services/workflows-service/src/bull-mq/queues/base-queue-worker.service.ts
(1 hunks)services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts
(1 hunks)services/workflows-service/src/bull-mq/queues/outgoing-webhook-queue.service.ts
(1 hunks)services/workflows-service/src/bull-mq/types.ts
(1 hunks)services/workflows-service/src/env.ts
(1 hunks)services/workflows-service/src/events/document-changed-webhook-caller.ts
(3 hunks)services/workflows-service/src/events/workflow-completed-webhook-caller.ts
(4 hunks)services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts
(3 hunks)services/workflows-service/src/webhooks/incoming/incoming-webhooks.controller.ts
(2 hunks)services/workflows-service/src/webhooks/incoming/incoming-webhooks.module.ts
(3 hunks)services/workflows-service/src/webhooks/incoming/incoming-webhooks.service.ts
(1 hunks)services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.module.ts
(1 hunks)services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts
(1 hunks)services/workflows-service/src/workflow/workflow.module.ts
(2 hunks)services/workflows-service/src/workflow/workflow.service.unit.test.ts
(1 hunks)
💤 Files with no reviewable changes (1)
- services/workflows-service/src/alert/webhook-manager/webhook-http.service.ts
🚧 Files skipped from review as they are similar to previous changes (15)
- services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.module.ts
- services/workflows-service/src/webhooks/incoming/incoming-webhooks.service.ts
- services/workflows-service/prisma/data-migrations
- services/workflows-service/src/alert/alert.module.ts
- services/workflows-service/src/app.module.ts
- services/workflows-service/src/bull-mq/types.ts
- services/workflows-service/docker-compose.redis.yml
- services/workflows-service/src/workflow/workflow.module.ts
- services/workflows-service/src/env.ts
- services/workflows-service/src/bull-mq/consts.ts
- services/workflows-service/src/webhooks/outgoing-webhooks/outgoing-webhooks.service.ts
- services/workflows-service/src/webhooks/incoming/incoming-webhooks.controller.ts
- services/workflows-service/src/events/document-changed-webhook-caller.ts
- services/workflows-service/src/workflow/workflow.service.unit.test.ts
- services/workflows-service/src/events/workflow-state-changed-webhook-caller.ts
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build (windows-latest)
- GitHub Check: format
🔇 Additional comments (15)
services/workflows-service/src/events/workflow-completed-webhook-caller.ts (3)
16-17
: LGTM: Queue system integration setup.The imports and constructor injection for the queue system are properly implemented.
Also applies to: 31-31
137-144
: Standardize webhook arguments structure.The webhook arguments structure varies between the two implementations.
146-146
: Add QUEUE_SYSTEM_ENABLED to environment configuration template.The QUEUE_SYSTEM_ENABLED environment variable needs to be added to the configuration template.
services/workflows-service/src/alert/webhook-manager/webhook-manager.service.ts (3)
23-24
: LGTM! Clean dependency injection.The constructor properly injects the new queue and webhook services, following dependency injection best practices.
55-55
: Avoid type assertion with unknown.The type assertion
data as unknown as AnyRecord
is a code smell. Consider proper type validation or transformation.
69-69
: Extract timeout value to configuration.Same issue as in the queue implementation - the timeout value should be moved to configuration.
services/workflows-service/src/webhooks/incoming/incoming-webhooks.module.ts (3)
29-29
: LGTM! Clean import declarations.The imports follow NestJS conventions and use path aliases effectively.
Also applies to: 38-38
41-41
: LGTM! Module configuration follows NestJS patterns.The controller and service are properly configured, and the service is correctly exported for use in other modules.
Also applies to: 79-79, 82-82
84-84
: Verify module rename impact across the codebase.The rename from
WebhooksModule
toIncomingWebhooksModule
better reflects its responsibility.Let's verify that all imports of the old module name have been updated:
✅ Verification successful
Module rename verification completed successfully
The module rename from
WebhooksModule
toIncomingWebhooksModule
has been properly implemented across the codebase. All references are using the correct module names with a clear separation betweenIncomingWebhooksModule
andOutgoingWebhooksModule
.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for any remaining references to the old module name rg -l "WebhooksModule" --type tsLength of output: 409
Script:
#!/bin/bash # Show the context of WebhooksModule usage in each file rg "WebhooksModule" --type ts -C 2Length of output: 4523
services/workflows-service/src/bull-mq/bull-mq.module.ts (2)
21-31
: Add fallback or retry on Redis connection issues
CurrentlyBullModule.forRootAsync
uses only the basic Redis configuration. If Redis is unavailable or faces transient failures at startup, the module may fail to initialize. Add error handling or a retry strategy to improve connection resilience.
33-36
: Secure access to Bull Board
Exposing the Bull Board at/queues
can cause security or privacy concerns in a production environment. Consider restricting access via authentication, IP whitelisting, or environment-based guards to prevent unauthorized queue monitoring.services/workflows-service/src/bull-mq/queues/outgoing-webhook-queue.service.ts (1)
27-29
: Handle exceptions frominvokeWebhook
calls gracefully
If an external webhook fails, consider adding retries, detailed logs, or fallback logic. Relying solely on max job attempts can obscure important errors for debugging and alerting.services/workflows-service/src/bull-mq/queues/incoming-webhook-queue.service.ts (1)
32-37
: Implement error handling and complete the TODO
ThehandleJob
method lacks a try-catch block, validation of job data, and logging of success or failure. Proper error handling ensures failed jobs can be retried or debugged effectively.services/workflows-service/package.json (2)
35-36
: LGTM! Well-structured Redis container management scripts.The Redis scripts follow good practices:
- Using --wait flag ensures container readiness
- Using --volumes flag ensures proper cleanup
- Follows the same pattern as existing DB scripts
58-60
: Verify BullMQ package versions compatibility.Please verify the compatibility between the following packages:
@nestjs/bullmq@^10.2.1
might require a more recent version ofbullmq
than^5.13.2
- Consider using the same version constraint style (^ vs exact) across related packages
Run this script to check the compatibility:
Also applies to: 63-63, 90-90
✅ Verification successful
BullMQ package versions are compatible ✅
The current version of
bullmq@^5.13.2
satisfies the peer dependency requirements of@nestjs/bullmq@^10.2.1
. The caret (^) version constraint is appropriate as it allows for compatible minor version updates while preventing breaking changes.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check BullMQ packages compatibility and latest versions echo "Checking @nestjs/bullmq peer dependencies..." npm info @nestjs/[email protected] peerDependencies echo -e "\nChecking latest versions..." npm show bullmq versions --json | jq -r '.[-5:]' npm show @nestjs/bullmq versions --json | jq -r '.[-5:]' npm show @bull-board/api versions --json | jq -r '.[-5:]'Length of output: 717
if (env.QUEUE_SYSTEM_ENABLED) { | ||
return await this.outgoingWebhookQueueService.addJob(webhookArgs); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove duplicate queue system checks.
The queue system enabled check and job addition logic is duplicated. This could lead to maintenance issues and inconsistencies.
Consider restructuring the code to have a single queue system check:
const webhookArgs = {
requestConfig: {
url,
method: 'POST',
headers: {},
body: payload,
timeout: 15_000,
},
customerConfig: {
webhookSharedSecret,
},
} as const;
if (env.QUEUE_SYSTEM_ENABLED) {
return await this.outgoingWebhookQueueService.addJob(webhookArgs);
}
try {
- // Duplicate payload construction removed...
- if (env.QUEUE_SYSTEM_ENABLED) {
- return await this.outgoingWebhookQueueService.addJob({...});
- }
const res = await this.#__axios.post(url, payload, {
Also applies to: 180-189
|
||
@common.Injectable() | ||
export abstract class WebhookHttpService extends HttpService {} | ||
export class WebhookHttpService {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove empty WebhookHttpService class.
The WebhookHttpService
class appears to be empty and serves no purpose. Consider removing it since the webhook functionality has been moved to dedicated services.
-@common.Injectable()
-export class WebhookHttpService {}
export class OutgoingWebhookQueueService extends BaseQueueWorkerService<WebhookJobData> { | ||
constructor( | ||
@InjectQueue(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name) outgoingQueue: Queue, | ||
@InjectQueue(QUEUES.OUTGOING_WEBHOOKS_QUEUE.dlq) outgoingDLQ: Queue, | ||
protected readonly outgoingWebhookService: OutgoingWebhooksService, | ||
protected readonly logger: AppLoggerService, | ||
protected readonly config: ConfigService, | ||
) { | ||
super(outgoingQueue, outgoingDLQ, logger, config); | ||
} | ||
|
||
async handleJob(job: Job<TJobArgs>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Align queue worker's type parameter with the job payload
Your base class is typed as BaseQueueWorkerService<WebhookJobData>
, while your handleJob
method uses Job<TJobArgs>
where TJobArgs
wraps WebhookJobData
. This mismatch can cause confusion or type-checking complexities.
-export class OutgoingWebhookQueueService extends BaseQueueWorkerService<WebhookJobData> {
+export class OutgoingWebhookQueueService extends BaseQueueWorkerService<TJobArgs> {
constructor(
@InjectQueue(QUEUES.OUTGOING_WEBHOOKS_QUEUE.name) outgoingQueue: Queue,
...
Committable suggestion skipped: line range outside the PR's diff.
connection: { | ||
host: this.configService.get('REDIS_HOST'), | ||
port: this.configService.get('REDIS_PORT'), | ||
password: this.configService.get('REDIS_PASSWORD'), | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation and error handling for Redis configuration.
The Redis connection parameters should be validated and have default values to prevent runtime errors.
Consider this implementation:
connection: {
- host: this.configService.get('REDIS_HOST'),
- port: this.configService.get('REDIS_PORT'),
- password: this.configService.get('REDIS_PASSWORD'),
+ host: this.configService.get('REDIS_HOST', 'localhost'),
+ port: this.configService.get<number>('REDIS_PORT', 6379),
+ password: this.configService.get('REDIS_PASSWORD', undefined),
+ maxRetriesPerRequest: 3,
+ enableReadyCheck: true,
},
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
connection: { | |
host: this.configService.get('REDIS_HOST'), | |
port: this.configService.get('REDIS_PORT'), | |
password: this.configService.get('REDIS_PASSWORD'), | |
}, | |
connection: { | |
host: this.configService.get('REDIS_HOST', 'localhost'), | |
port: this.configService.get<number>('REDIS_PORT', 6379), | |
password: this.configService.get('REDIS_PASSWORD', undefined), | |
maxRetriesPerRequest: 3, | |
enableReadyCheck: true, | |
}, |
Summary by CodeRabbit
Based on the comprehensive changes, here are the release notes:
New Features
Improvements
Dependencies
Performance