Appearance
Add a BullMQ Job
Add a new background job type that the API can enqueue and the worker can process.
Step 1 — Define the queue name
Open packages/queues/src/queue-names.ts and add your queue:
typescript
export const QUEUE_NAMES = {
INVOICE_PROCESSING: 'invoice-processing',
EMAIL_SYNC: 'email-sync',
AUTOMATION: 'automation',
WEBHOOK_DELIVERY: 'webhook-delivery',
MY_JOB: 'my-job', // add this
} as constStep 2 — Define the job data interface
Open packages/queues/src/job-types.ts and add the interface:
typescript
export interface MyJobData {
tenantId: string
someParam: string
}Also add it to the JobData union at the bottom:
typescript
export type JobData =
| InvoiceProcessingJobData
| EmailSyncJobData
| AutomationJobData
| WebhookDeliveryJobData
| MyJobData // add thisRebuild the package so the API and worker pick up the change:
bash
pnpm --filter @tt/queues buildStep 3 — Enqueue from the API
In the NestJS service that should trigger the job, inject the BullMQ queue and enqueue:
typescript
import { InjectQueue } from '@nestjs/bullmq'
import { Queue } from 'bullmq'
import { QUEUE_NAMES, type MyJobData } from '@tt/queues'
@Injectable()
export class MyService {
constructor(
@InjectQueue(QUEUE_NAMES.MY_JOB) private readonly myQueue: Queue<MyJobData>,
) {}
async triggerJob(tenantId: string, someParam: string) {
await this.myQueue.add('my-job', { tenantId, someParam })
}
}Register the queue in the module:
typescript
import { BullModule } from '@nestjs/bullmq'
import { QUEUE_NAMES } from '@tt/queues'
@Module({
imports: [
BullModule.registerQueue({ name: QUEUE_NAMES.MY_JOB }),
],
providers: [MyService],
})
export class MyModule {}Step 4 — Process in the worker
In services/worker/src/, create a processor:
typescript
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Job } from 'bullmq'
import { QUEUE_NAMES, type MyJobData } from '@tt/queues'
@Processor(QUEUE_NAMES.MY_JOB)
export class MyJobProcessor extends WorkerHost {
async process(job: Job<MyJobData>): Promise<void> {
const { tenantId, someParam } = job.data
// do the work
}
}Register the processor in the worker's app module:
typescript
BullModule.registerQueue({ name: QUEUE_NAMES.MY_JOB }),
// and in providers:
MyJobProcessor,Retry and delay options
Pass options when enqueuing:
typescript
await this.myQueue.add('my-job', data, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false,
})