Skip to content

Add a BullMQ Job

Add a new background job type that the API can enqueue and the worker can process.

Step 1 — Define the queue name

Open packages/queues/src/queue-names.ts and add your queue:

typescript
export const QUEUE_NAMES = {
  INVOICE_PROCESSING: 'invoice-processing',
  EMAIL_SYNC: 'email-sync',
  AUTOMATION: 'automation',
  WEBHOOK_DELIVERY: 'webhook-delivery',
  MY_JOB: 'my-job',        // add this
} as const

Step 2 — Define the job data interface

Open packages/queues/src/job-types.ts and add the interface:

typescript
export interface MyJobData {
  tenantId: string
  someParam: string
}

Also add it to the JobData union at the bottom:

typescript
export type JobData =
  | InvoiceProcessingJobData
  | EmailSyncJobData
  | AutomationJobData
  | WebhookDeliveryJobData
  | MyJobData          // add this

Rebuild the package so the API and worker pick up the change:

bash
pnpm --filter @tt/queues build

Step 3 — Enqueue from the API

In the NestJS service that should trigger the job, inject the BullMQ queue and enqueue:

typescript
import { InjectQueue } from '@nestjs/bullmq'
import { Queue } from 'bullmq'
import { QUEUE_NAMES, type MyJobData } from '@tt/queues'

@Injectable()
export class MyService {
  constructor(
    @InjectQueue(QUEUE_NAMES.MY_JOB) private readonly myQueue: Queue<MyJobData>,
  ) {}

  async triggerJob(tenantId: string, someParam: string) {
    await this.myQueue.add('my-job', { tenantId, someParam })
  }
}

Register the queue in the module:

typescript
import { BullModule } from '@nestjs/bullmq'
import { QUEUE_NAMES } from '@tt/queues'

@Module({
  imports: [
    BullModule.registerQueue({ name: QUEUE_NAMES.MY_JOB }),
  ],
  providers: [MyService],
})
export class MyModule {}

Step 4 — Process in the worker

In services/worker/src/, create a processor:

typescript
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { Job } from 'bullmq'
import { QUEUE_NAMES, type MyJobData } from '@tt/queues'

@Processor(QUEUE_NAMES.MY_JOB)
export class MyJobProcessor extends WorkerHost {
  async process(job: Job<MyJobData>): Promise<void> {
    const { tenantId, someParam } = job.data
    // do the work
  }
}

Register the processor in the worker's app module:

typescript
BullModule.registerQueue({ name: QUEUE_NAMES.MY_JOB }),
// and in providers:
MyJobProcessor,

Retry and delay options

Pass options when enqueuing:

typescript
await this.myQueue.add('my-job', data, {
  attempts: 3,
  backoff: { type: 'exponential', delay: 1000 },
  removeOnComplete: true,
  removeOnFail: false,
})

TT Time Tracker — Internal Documentation