Background Jobs
Background jobs in SolidX let you offload non-blocking work from request/response flows.
Common use cases:
- Notification delivery (email/SMS/WhatsApp)
- External API callbacks/webhooks
- OCR/LLM processing and other long-running tasks
- Retryable integrations
SolidX uses a Work Queue / Competing Consumers model:
- Publishers enqueue jobs
- Subscribers consume jobs asynchronously
Job execution state is tracked in:
ss_mq_message_queue(queue definitions)ss_mq_message(message payload/status/retries)
Core Building Blocks
Every background job setup usually contains:
- Queue options file (broker type, queue name, etc.)
- Publisher class
- PublisherFactory-based publish call from service/subscriber hook
- Subscriber class
- Module provider wiring
Typed payloads are recommended for safer publish/subscribe contracts.
Folder Conventions (Must Follow)
Based on the Solid monorepo conventions, background job code should be organized under:
solid-api/src/<module>/background-jobssolid-api/src/<module>/background-jobs/databasesolid-api/src/<module>/background-jobs/rabbitmq
Directory intent:
background-jobs/-> queue contracts and shared job artifacts (for example queue option files and payload interfaces)background-jobs/rabbitmq/-> RabbitMQ publisher/subscriber implementationsbackground-jobs/database/-> Database broker publisher/subscriber implementations
Recommended file naming:
- Queue options:
<feature>-queue-options.ts
- RabbitMQ:
<feature>-publisher-rabbitmq.service.ts<feature>-subscriber-rabbitmq.service.ts
- Database:
<feature>-publisher-database.service.ts<feature>-subscriber-database.service.ts
Typical edit sequence for new jobs:
- Add queue options file under
background-jobs/. - Add broker-specific publisher/subscriber files under
background-jobs/<broker>/. - Register providers in the owning module.
- Publish from service/subscriber-hook code via
PublisherFactory.
1) Queue Options + Payload Contract
import { ActiveUserData, BrokerType } from "@solidxai/core";
const OCR_REQUEST_QUEUE_NAME = "ocr_request_queue";
export interface OcrRequestPayload {
ocrRequestId: number;
loggedInUser?: ActiveUserData;
}
export default {
name: "ocrRequestQueueRabbitmq",
type: BrokerType.RabbitMQ,
queueName: OCR_REQUEST_QUEUE_NAME,
prefetch: 5,
};
prefetch is only applicable to RabbitMQ (details below).
2) Publisher Class Example (RabbitMQ)
import { Injectable } from "@nestjs/common";
import {
MqMessageService,
MqMessageQueueService,
QueuesModuleOptions,
RabbitMqPublisher,
} from "@solidxai/core";
import ocrRequestQueueOptions, { OcrRequestPayload } from "../ocr-request-queue-options";
@Injectable()
export class OcrRequestPublisherRabbitmq extends RabbitMqPublisher<OcrRequestPayload> {
constructor(
protected readonly mqMessageService: MqMessageService,
protected readonly mqMessageQueueService: MqMessageQueueService
) {
super(mqMessageService, mqMessageQueueService);
}
options(): QueuesModuleOptions {
return {
...ocrRequestQueueOptions,
};
}
}
3) Recommended Publishing Pattern: PublisherFactory
In application code, publish through PublisherFactory instead of injecting publisher classes directly.
import { Injectable } from "@nestjs/common";
import { PublisherFactory } from "@solidxai/core";
import { OcrRequestPayload } from "../jobs/ocr-request-queue-options";
@Injectable()
export class OcrRequestService {
constructor(
private readonly publisherFactory: PublisherFactory<OcrRequestPayload>
) {}
async triggerOcr(ocrRequestId: number) {
await this.publisherFactory.publish(
{
payload: { ocrRequestId },
parentEntity: "ocrRequest",
parentEntityId: ocrRequestId,
},
"OcrRequestPublisher"
);
}
}
Why this is preferred:
- Broker-specific publisher resolution is centralized
- Code remains stable when
QUEUES_DEFAULT_BROKERchanges - No direct dependency on RabbitMQ/Database publisher implementation in business services
Publisher name guidance:
- Prefer passing a logical base name (example:
OcrRequestPublisher). PublisherFactoryresolves the broker-specific provider (...Database/...Rabbitmq) usingQUEUES_DEFAULT_BROKER.- Legacy RabbitMQ-suffixed names may still work (backward compatibility), but base names are recommended.
4) Subscriber Example (RabbitMQ, OCR Workflow)
import { Injectable, Logger } from "@nestjs/common";
import { InjectEntityManager } from "@nestjs/typeorm";
import { EntityManager } from "typeorm";
import {
MqMessageService,
MqMessageQueueService,
QueueMessage,
QueuesModuleOptions,
RabbitMqSubscriber,
S3FileService,
SettingService,
SolidMicroserviceAdapter,
} from "@solidxai/core";
import ocrRequestQueueOptions, { OcrRequestPayload } from "../ocr-request-queue-options";
@Injectable()
export class OcrRequestSubscriberRabbitmq extends RabbitMqSubscriber<OcrRequestPayload> {
private readonly logger = new Logger(OcrRequestSubscriberRabbitmq.name);
constructor(
readonly mqMessageService: MqMessageService,
readonly mqMessageQueueService: MqMessageQueueService,
@InjectEntityManager() readonly entityManager: EntityManager,
private readonly settingService: SettingService,
private readonly solidMicroserviceAdapter: SolidMicroserviceAdapter,
private readonly s3FileService: S3FileService
) {
super(mqMessageService, mqMessageQueueService);
}
options(): QueuesModuleOptions {
return {
...ocrRequestQueueOptions,
};
}
async subscribe(message: QueueMessage<OcrRequestPayload>) {
const { ocrRequestId } = message.payload;
this.logger.log(`Processing OCR request ${ocrRequestId}`);
// 1) Load request data from DB
// 2) Update status -> Started
// 3) Fetch signed S3 URL
// 4) Invoke OCR/LLM pipeline
// 5) Persist output/status
// 6) Invoke callback URL
// 7) Handle failure status + callback failure separately
return { success: true };
}
}
The subscriber above is intentionally condensed. Keep the heavy business logic in dedicated services and call them from subscribe(...).
RabbitMQ-Specific: prefetch (Important)
prefetch controls how many unacknowledged messages a subscriber can process concurrently per channel.
Example:
prefetch: 1-> strict one-by-one processingprefetch: 5-> up to 5 in-flight messages- higher values -> higher throughput, but also higher concurrent load on DB/API dependencies
Use prefetch to tune parallelism for your workload.
Info
prefetch is a RabbitMQ concept and is not used by the Database broker implementation.
Database Broker Example (Reference)
For lightweight setups without RabbitMQ, use database-backed queues.
Queue options (Database)
import { BrokerType } from "@solidxai/core";
const MAIL_QUEUE_NAME = "solidx.email.db";
export default {
name: "solidEmailInstance",
type: BrokerType.Database,
queueName: MAIL_QUEUE_NAME,
};
Publisher (Database)
import { Injectable } from "@nestjs/common";
import {
DatabasePublisher,
MqMessageQueueService,
MqMessageService,
QueuesModuleOptions,
} from "@solidxai/core";
import mailQueueOptions from "./email-queue-options-database";
@Injectable()
export class EmailQueuePublisherDatabase extends DatabasePublisher<any> {
constructor(
protected readonly mqMessageService: MqMessageService,
protected readonly mqMessageQueueService: MqMessageQueueService
) {
super(mqMessageService, mqMessageQueueService);
}
options(): QueuesModuleOptions {
return {
...mailQueueOptions,
};
}
}
Publish invocation for database broker should still use PublisherFactory.publish(...) with the logical publisher name.
The factory resolves the broker-specific implementation.
Subscriber (Database)
import { Injectable } from "@nestjs/common";
import {
DatabaseSubscriber,
PollerService,
MqMessageQueueService,
MqMessageService,
QueueMessage,
QueuesModuleOptions,
} from "@solidxai/core";
import mailQueueOptions from "./email-queue-options-database";
@Injectable()
export class EmailQueueSubscriberDatabase extends DatabaseSubscriber<any> {
constructor(
readonly mqMessageService: MqMessageService,
readonly mqMessageQueueService: MqMessageQueueService,
readonly poller: PollerService
) {
super(mqMessageService, mqMessageQueueService, poller);
}
options(): QueuesModuleOptions {
return {
...mailQueueOptions,
};
}
subscribe(message: QueueMessage<any>) {
// Delegate to application service
return { success: true };
}
}
Naming Convention
Use clear broker-specific class names:
NamePublisherDatabase,NameSubscriberDatabaseNamePublisherRabbitmq,NameSubscriberRabbitmq
Register them as standard Nest providers in the relevant module.
When publishing, pass the logical publisher name to PublisherFactory.
The factory resolves <PublisherName><Broker> (with backward compatibility fallback for RabbitMQ naming).
Subscriber Logging (Quick Note)
Base subscriber classes provide a lazy logger via a protected accessor:
RabbitMqSubscriberandDatabaseSubscriberexposethis.logger- Logger context defaults to
this.constructor.name - You can optionally override
loggerContextin subclasses for custom labels
Example:
@Injectable()
export class OcrRequestSubscriberRabbitmq extends RabbitMqSubscriber<OcrRequestPayload> {
protected get loggerContext(): string {
return "OcrRequestSubscriber";
}
async subscribe(message: QueueMessage<OcrRequestPayload>) {
this.logger.log(`Processing OCR request ${message.payload.ocrRequestId}`);
this.logger.debug(`messageId=${message.messageId}`);
return { success: true };
}
}
This avoids redeclaring private readonly logger = new Logger(...) in each subclass and keeps logs consistently namespaced.
Environment Variables
Broker selection
-
QUEUES_DEFAULT_BROKERdatabase(default)rabbitmq
-
QUEUES_RABBIT_MQ_URL(RabbitMQ only)- example:
amqp://guest:guest@127.0.0.1:5672
- example:
Service role
QUEUES_SERVICE_ROLEsubscriber-> only consumesboth-> publishes and consumes
Queue enablement filter
QUEUES_QUEUE_NAME_REGEX_TO_ENABLE- Regex used at subscriber startup to decide whether a subscriber should start for a queue.
all(or empty) means no queue-name filtering.- Examples:
^solid_-> only queues starting withsolid_^(?!solid_).+-> queues not starting withsolid_
Startup guards used by subscribers
SOLID_CLI_RUNNING- Subscribers skip startup when this is
"true".
- Subscribers skip startup when this is
QUEUES_DEFAULT_BROKER- Database subscribers start only when broker is
database. - RabbitMQ subscribers start only when broker is
rabbitmq.
- Database subscribers start only when broker is
Notification queue toggles
COMMON_EMAIL_SHOULD_QUEUECOMMON_SMS_SHOULD_QUEUE
Scaling and Workload Isolation
SolidX background jobs can be scaled using the same codebase by combining:
- Role-based startup (
QUEUES_SERVICE_ROLE) - Queue-name filtering (
QUEUES_QUEUE_NAME_REGEX_TO_ENABLE) - RabbitMQ concurrency tuning (
prefetch) - Horizontal replicas (multiple worker processes/containers)
1) Role-based process split
- Keep API nodes as
bothwhen load is small. - For higher load, run dedicated worker nodes with:
QUEUES_SERVICE_ROLE=subscriber
- This isolates queue processing from HTTP request traffic.
2) Queue-based workload split
Use queue-name regex to route different queue groups to different worker pools.
Examples:
- Worker pool A:
QUEUES_QUEUE_NAME_REGEX_TO_ENABLE=^solid_ - Worker pool B:
QUEUES_QUEUE_NAME_REGEX_TO_ENABLE=^(?!solid_).+
This lets you scale specific business workloads independently.
3) RabbitMQ throughput tuning with prefetch
prefetch (RabbitMQ only) controls in-flight messages per subscriber instance.
- Increase
prefetchto improve parallelism per process. - Keep it aligned with downstream capacity (DB, external APIs, CPU/memory).
- Combine with multiple process replicas for horizontal scaling.
4) Horizontal replicas
Run multiple instances of the same subscriber service (PM2, ECS tasks, Kubernetes pods, etc.).
- Same queue + multiple replicas -> competing consumers distribution.
- Different regex filters + dedicated replicas -> isolated queue domains.
Deployment Note
A practical deployment pattern is to run:
- Main backend service (API + optional publisher role)
- One or more dedicated subscriber services
- Optional subscriber pools per queue regex group
PM2 example: main backend
module.exports = {
apps: [
{
name: "erp_solid_backend",
script: "npm",
args: "run start",
},
],
};
PM2 example: solid_* subscriber pool
module.exports = {
apps: [
{
name: "solid_subscribers",
script: "npm",
args: "run start",
env: {
PORT: "5000",
QUEUES_SERVICE_ROLE: "subscriber",
QUEUES_QUEUE_NAME_REGEX_TO_ENABLE: "^solid_",
SOLID_SCHEDULER_ENABLED: "false",
},
},
],
};
PM2 example: non-solid subscriber pool
module.exports = {
apps: [
{
name: "app_subscribers",
script: "npm",
args: "run start",
env: {
PORT: "4000",
QUEUES_SERVICE_ROLE: "subscriber",
QUEUES_QUEUE_NAME_REGEX_TO_ENABLE: "^(?!solid_).+",
SOLID_SCHEDULER_ENABLED: "false",
},
},
],
};
The same model maps directly to ECS:
- Build one container image from the same codebase.
- Create separate ECS task definitions/services with different environment variables.
- Scale task count independently per subscriber service based on queue pressure.
Broker Selection Guidance
Use Database broker when:
- You want zero external queue infrastructure
- Throughput is moderate
- Simplicity is preferred
Use RabbitMQ when:
- Throughput is high
- You need stronger queueing characteristics
- You want fine-grained concurrency control using
prefetch
RabbitMQ management UI (default local setup): http://localhost:15672 (guest/guest).