메시지 큐
메시지 큐는 컴퓨터 시스템에서 쓰이는 비동기 통신 프로토콜의 한 종류입니다.
응용 프로그램에서 다른 응용 프로그램으로 메시지를 보낼 수 있으며
수신자인 응용 프로그램이 검색하고 처리할 때까지 대기열에 저장됩니다.
서비스 간의 비동기 통신을 용이하게 만들어 서비스의 성능 신뢰성 확장성을 올려줍니다.
ex)
가입과 동시에 가입 인사 이메일을 받는 형태로
유저 서비스와 이메일 서비스가 나눠져 있을 경우
서비스가 잘 작동하고 있다면 가입과 동시에 이메일을 받을 수 있지만
이메일 서비스가 응답에 실패하거나 일시적으로 응답하지 않는다면
사용자가 이메일을 받을 수 없게 됩니다.
만약 해당 이메일이 가입 시점에 보내는 것이 중요한 서비스라면
이것을 메시지 큐를 이용해 해결할 수 있습니다.
분산 환경에서 대량의 메시지를 처리할 때 메시지 큐를 관리하는 것은 어려운 작업입니다.
Redis를 기반으로 구축된 Node.js 라이브러리 BullMQ의 도움을 받을 수 있습니다.
Job은 해야 할 일을 명세 한 데이터
Job을 리스트로 관리해 주는 것이 Queue
Queue에서 Job을 꺼내 일을 처리하는 존재를 Worker라 합니다.
Queue 에서 Job이 처리되는 과정
Queue는 다양한 유형의 Job을 관리하며 이 작업을 언제 처리할지 결정합니다.
Job은 Queue에 추가된 시점에 해당 상태에 머무릅니다.
Wait job이 처리되기 전 일반적으로 진입하는 상태
Prioritized 높은 우선순위를 가진 Job이 처리되기 위해 진입하는 상태입니다.
Delayed 대기 시간을 부여 받은 Job이 처리를 기다리는 상태입니다.
priority 옵션에 따라 Wait 또는 Prioritized상태로 진입할 수 있습니다.
Job이 실제로 처리되기 시작하면 Active 상태로 진입합니다.
처리가 성공적으로 완료되면 Completed 상태로 예외가 발생하여
실패하게 된다면 Failed 상태로 진입합니다.
메시지 큐는 아래 상황에서 자주 사용됩니다.
메인 스레드가 블로킹 없이 시간이 오래 걸리는 작업을 처리할 때
이메일 보내기 이미지 처리 보고서 생성 주문 처리
특정 시간 또는 간격으로 작업을 실행해야 할 때
메일 데이터 베이스 정리 이메일 뉴스레터 발송
부하 방지를 위해 작업 속도를 제어해야 할 때
큐 라이브러리
큐 라이브러리는 작업을 저장하고 처리하는 데이터 구조인 큐를 관리하는 소프트웨어 도구입니다.
큐는 FIFO First in firstOut 선입선출 방식으로 작동하여 먼저 들어온 작업이 먼저 처리되는 방식입니다.
큐 라이브러리는 이러한 작업 큐를 생성하고 작업을 추가하고 처리하고 상태를 관리하는 기능을 제공합니다.
작업 추가 : 새로운 작업을 큐에 추가할 수 있습니다.
작업 처리 : 큐에서 작업을 꺼내어 처리할 수 있습니다.
작업 상태 : 작업의 성공 실패 대기 상태등을 관리할 수 있습니다.
재시도 기능 : 실패한 작업을 자동으로 재시도 할 수 있는 기능을 제공합니다.
우선순위 설정 : 특정 작업의 우선순위를 높여 더 빨리 처리할 수 있도록 설정할 수 있습니다.
모니터링 : 큐와 작업의 상태를 모니터링하고 관리할 수 있는 도구를 제공합니다.
BullMQ
![[Pasted image 20250210204438.png]]
Redis를 기반으로 하여 구축된 빠르고 강력한 작업 큐 시스템을 구현한
Node.js 라이브러리 입니다.
주요 목표
정확한 한번의 큐 : 모든 메시지를 정확히 한번 전달하려고 시도하고
최악의 경우에도 최소 한번은 전달합니다.
수평 확장 용이
작업을 병렬로 처리하기 위해 더 많은 워커를 추가하여 쉽게 확장할 수 있습니다.
일관성
안정적인 작업 처리를 보장
고성능
Redis에서 가능한 최대 처리량을 얻기 위해
효율적인 (lua)루아 스크립트와 파이프라인을 결합하여
높은 성능을 추구합니다.
주요 기능
최소한의 CPU사용 폴링이 없는 설계로 인해 CPU 사용량이 최소화됩니다.
Redis 기반의 분산 작업 실행 Redis를 사용하여 작업을 분산처리합니다.
LIFO 및 FIFO 작업 후입선출 및 선입선출 방식의 작업 처리 지원
지연 작업 특정 시간 이후에 실행될 작업을 설정할 수 있습니다.
정기적이고 반복 가능한 작업 : 크론 스펙에 따라 작업을 예약할 수 있습니다.
워커별 동시성 설정 각 워커에 대한 동시성 설정을 지원합니다.
프로세스 충돌 자동 복구 프로세스가 충돌할 경우 자동으로 복구합니다.
부모 - 자식 의존성 작업간의 의존성을 관리할 수 있습니다.
워커별 동시성 : 한번에 얼마나 처리할 수 있는지 작업의 수를 조정하는 기능
언제 사용하는가?
많은 작업을 처리해야 할 때
비동기적으로 작업을 처리해야 할 때
여러 단계의 작업이 있는 경우
빠른시작
설치
npm install bullmq
import { Queue } from 'bullmq';
// 'foo'라는 이름의 큐를 생성합니다.
const myQueue = new Queue('foo');
// 작업을 큐에 추가하는 비동기 함수 정의
async function addJobs() {
// 'myJobName'이라는 이름의 작업을 큐에 추가하고, 데이터로 { foo: 'bar' }를 전달합니다.
await myQueue.add('myJobName', { foo: 'bar' });
// 또 다른 'myJobName' 작업을 큐에 추가하고, 데이터로 { qux: 'baz' }를 전달합니다.
await myQueue.add('myJobName', { qux: 'baz' });
}
// addJobs 함수를 호출하여 큐에 작업을 추가합니다.
await addJobs();
해당 예제 성공적으로 실행하려면 로컬 컴퓨터에서 Redis 서비스를 실행해야 합니다.
해당 설정 주소
https://docs.bullmq.io/guide/connections
연결
Queue 작업을 시작하려면 Redis 인스턴스에 대한 연결이 필요합니다.
BullMQ는 노드 모듈 ioredis를 사용하고 BullMQ에 전달하는 옵션은
ioredis의 생성자에게 전달 됩니다.
옵션을 제공하지 않으면 포트 6379와 localhost로 기본 설정됩니다.
모든 클래스는 최소한 하나의 Redis 연결을 소비하지만 어떤 상황에서는 연결을 재사용할 수 도 있습니다.
Queue및 Worker클래스는 기존 ioredis인스턴스를 허용하고
그 연결을 재사용할 수 있지만
QueueScheduler 및 QueueEvents는 Redis에 대한 차단 연결을 요구하기 때문에 이를 수행할 수 없으며 이는 재사용을 불가능하게 만듭니다.
차단 연결을 사용하면 재사용이 불가능하다.
import { Queue, Worker } from 'bullmq';
// Redis에 연결하여 'myqueue'라는 이름의 새로운 큐를 생성합니다.
const myQueue = new Queue('myqueue', {
connection: {
host: 'myredis.taskforce.run', // Redis 서버 호스트
port: 32856, // Redis 서버 포트
},
});
// 동일한 'myqueue'에서 작업을 처리할 워커를 생성합니다.
// 워커는 큐에서 들어오는 작업을 처리하는 역할을 합니다.
const myWorker = new Worker('myqueue', async job => {
// 여기서 각 작업(job)을 처리하는 로직을 작성합니다.
}, {
connection: {
host: 'myredis.taskforce.run', // Redis 서버 호스트
port: 32856, // Redis 서버 포트
},
});
import { Queue } from 'bullmq'; // BullMQ의 Queue 클래스를 임포트합니다.
import IORedis from 'ioredis'; // IORedis 라이브러리를 임포트합니다.
// 새로운 IORedis 인스턴스를 생성하여 Redis와 연결합니다.
const connection = new IORedis();
// 두 개의 서로 다른 큐를 생성합니다.
// 'myFirstQueue'라는 이름의 큐를 생성하고, 이전에 생성한 Redis 연결을 재사용합니다.
const myFirstQueue = new Queue('myFirstQueue', { connection });
// 'mySecondQueue'라는 이름의 큐를 생성하고, 동일한 Redis 연결을 재사용합니다.
const mySecondQueue = new Queue('mySecondQueue', { connection });
import { Worker } from 'bullmq'; // BullMQ의 Worker 클래스를 임포트합니다.
import IORedis from 'ioredis'; // IORedis 라이브러리를 임포트합니다.
// Redis와 연결하기 위한 IORedis 인스턴스를 생성합니다.
// maxRetriesPerRequest를 null로 설정하여 요청 실패 시 재시도를 무한으로 허용합니다.
const connection = new IORedis({ maxRetriesPerRequest: null });
// 두 개의 서로 다른 워커를 생성합니다.
// 'myFirstWorker'라는 이름의 워커를 생성하고, 이전에 생성한 Redis 연결을 재사용합니다.
const myFirstWorker = new Worker('myFirstWorker', async job => {
// 여기서 각 작업(job)을 처리하는 로직을 작성합니다.
}, {
connection, // Redis 연결을 설정합니다.
});
// 'mySecondWorker'라는 이름의 워커를 생성하고, 동일한 Redis 연결을 재사용합니다.
const mySecondWorker = new Worker('mySecondWorker', async job => {
// 여기서 각 작업(job)을 처리하는 로직을 작성합니다.
}, {
connection, // Redis 연결을 설정합니다.
});
세 번째 예시에서 ioredis 인스턴스가 재사용되고 있더라도 작업자는 차단 연결을 만들기 위해 내부적으로 필요한 중복 연결을 생성합니다.
여기서 이야기하는 차단 연결은 블로킹이다.
해당 블로킹이 시작되면 특정 작업이 완료될때까지 다른작업이 대기해야함을 의미합니다.
import { Worker } from 'bullmq'; // BullMQ의 Worker 클래스를 임포트합니다.
import IORedis from 'ioredis'; // IORedis 라이브러리를 임포트합니다.
// Redis와 연결하기 위한 IORedis 인스턴스를 생성합니다.
// maxRetriesPerRequest를 null로 설정하여 요청 실패 시 재시도를 무한으로 허용합니다.
const connection = new IORedis({ maxRetriesPerRequest: null });
// 'foo'라는 이름의 큐에서 작업을 처리하는 워커를 생성합니다.
const worker = new Worker(
'foo', // 작업을 처리할 큐의 이름
async job => {
// 작업이 처리될 때 호출되는 비동기 함수
// 첫 번째 작업의 데이터가 { foo: 'bar' }일 경우와
// 두 번째 작업의 데이터가 { qux: 'baz' }일 경우를 출력합니다.
console.log(job.data); // 작업의 데이터 출력
},
{ connection }, // Redis 연결 설정
);
원하는 만큼의 작업자 프로세스를 추가할 수 있으며 BullMQ는 라운드 로빈 방식으로 작업을 모든 작업자 간에 분산시킵니다.
완료된 작업을 수신하려면 작업자에 리스너를 연결하면 됩니다.
// 작업이 성공적으로 완료되었을 때 발생하는 이벤트 리스너
worker.on('completed', job => {
// 완료된 작업의 ID를 콘솔에 출력합니다.
console.log(`${job.id} has completed!`);
});
// 작업이 실패했을 때 발생하는 이벤트 리스너
worker.on('failed', (job, err) => {
// 실패한 작업의 ID와 오류 메시지를 콘솔에 출력합니다.
console.log(`${job.id} has failed with ${err.message}`);
});
사용 가능한 이벤트 들
대기열에 작업 추가
import { Queue } from 'bullmq'; // BullMQ의 Queue 클래스를 임포트합니다.
// 'Paint'라는 이름의 큐를 생성합니다.
const queue = new Queue('Paint');
// 'cars'라는 이름의 작업을 큐에 추가합니다.
// 작업 데이터로 { color: 'blue' }를 전달하여 색상을 지정합니다.
queue.add('cars', { color: 'blue' });
작업자의 작업을 처리합니다.
import { Worker } from 'bullmq'; // BullMQ의 Worker 클래스를 임포트합니다.
// 'Paint'라는 이름의 큐에서 작업을 처리하는 워커를 생성합니다.
const worker = new Worker('Paint', async job => {
// 작업의 이름이 'cars'인 경우에만 실행됩니다.
if (job.name === 'cars') {
// job.data.color를 사용하여 차량의 색상을 지정하는 함수를 호출합니다.
await paintCar(job.data.color);
}
});
// paintCar 함수는 차량을 지정된 색상으로 칠하는 로직을 구현해야 합니다.
완료한 작업 리스너
import { QueueEvents } from 'bullmq'; // BullMQ의 QueueEvents 클래스를 임포트합니다.
// 'Paint'라는 이름의 큐에 대한 이벤트를 관리하는 인스턴스를 생성합니다.
const queueEvents = new QueueEvents('Paint');
// 작업이 성공적으로 완료되었을 때 발생하는 이벤트 리스너
queueEvents.on('completed', ({ jobId }) => {
// 작업이 완료되었음을 콘솔에 출력합니다.
console.log('done painting');
});
// 작업이 실패했을 때 발생하는 이벤트 리스너
queueEvents.on(
'failed',
({ jobId, failedReason }: { jobId: string; failedReason: string }) => {
// 실패한 작업의 ID와 실패 이유를 콘솔에 출력합니다.
console.error('error painting', failedReason);
},
);
때로는 특정 장소에서 모든 작업자 이벤트를 수신해야 하는데 이를 위해서는 특수 클래스를 사용해야합니다.
import { QueueEvents } from 'bullmq'; // BullMQ의 QueueEvents 클래스를 임포트합니다.
// QueueEvents 인스턴스를 생성합니다. 큐 이름을 지정하지 않은 경우, 기본 큐가 사용됩니다.
const queueEvents = new QueueEvents();
// 작업이 대기 중일 때 발생하는 이벤트 리스너
queueEvents.on('waiting', ({ jobId }) => {
// 대기 중인 작업의 ID를 콘솔에 출력합니다.
console.log(`A job with ID ${jobId} is waiting`);
});
// 작업이 활성화될 때 발생하는 이벤트 리스너
queueEvents.on('active', ({ jobId, prev }) => {
// 활성화된 작업의 ID와 이전 상태를 콘솔에 출력합니다.
console.log(`Job ${jobId} is now active; previous status was ${prev}`);
});
// 작업이 성공적으로 완료되었을 때 발생하는 이벤트 리스너
queueEvents.on('completed', ({ jobId, returnvalue }) => {
// 완료된 작업의 ID와 반환된 값을 콘솔에 출력합니다.
console.log(`${jobId} has completed and returned ${returnvalue}`);
});
// 작업이 실패했을 때 발생하는 이벤트 리스너
queueEvents.on('failed', ({ jobId, failedReason }) => {
// 실패한 작업의 ID와 실패 이유를 콘솔에 출력합니다.
console.log(`${jobId} has failed with reason ${failedReason}`);
});
});
이벤트 수명 주기
import { Queue } from 'bullmq'; // BullMQ의 Queue 클래스를 임포트합니다.
const myQueue = new Queue('Paint'); // 'Paint'라는 이름의 큐를 생성합니다.
// 작업이 대기 중일 때 발생하는 이벤트 리스너
myQueue.on('waiting', (job: Job) => {
// 대기 중인 작업이 처리되기를 기다리고 있습니다.
// 여기서 job 객체를 사용하여 대기 중인 작업의 세부정보를 처리할 수 있습니다.
});
import { Worker } from 'bullmq'; // BullMQ의 Worker 클래스를 임포트합니다.
const myWorker = new Worker('Paint'); // 'Paint'라는 이름의 큐에서 작업을 처리하는 워커를 생성합니다.
// 큐가 비어 있을 때 발생하는 이벤트 리스너
myWorker.on('drained', () => {
// 큐가 비워졌음을 나타내며, 더 이상 처리할 작업이 없습니다.
});
// 작업이 성공적으로 완료되었을 때 발생하는 이벤트 리스너
myWorker.on('completed', (job: Job) => {
// 작업이 완료되었음을 나타냅니다.
// 여기서 job 객체를 사용하여 완료된 작업의 세부정보를 처리할 수 있습니다.
});
// 작업이 실패했을 때 발생하는 이벤트 리스너
myWorker.on('failed', (job: Job) => {
// 작업이 실패했음을 나타냅니다.
// 여기서 job 객체를 사용하여 실패한 작업의 세부정보를 처리할 수 있습니다.
});
위 이벤트는 실제로 작업을 완료한 작업자에 대한 로컬 이벤트 입니다.
한곳에서 관리하고 싶을때는 다음 클래스를 사용할 수 있습니다.
import { QueueEvents } from 'bullmq'; // BullMQ의 QueueEvents 클래스를 임포트합니다.
// 'Paint'라는 이름의 큐에 대한 이벤트를 관리하는 인스턴스를 생성합니다.
const queueEvents = new QueueEvents('Paint');
// 작업이 성공적으로 완료되었을 때 발생하는 이벤트 리스너
queueEvents.on('completed', ({ jobId }: { jobId: string }) => {
// 모든 워커에서 작업이 완료될 때마다 호출됩니다.
// 여기서 jobId를 사용하여 완료된 작업의 ID를 처리할 수 있습니다.
});
// 작업의 진행 상황을 나타내는 이벤트 리스너
queueEvents.on(
'progress',
({ jobId, data }: { jobId: string; data: number | object }) => {
// jobId가 진행 상황 이벤트를 수신했음을 나타냅니다.
// 여기서 jobId와 data를 사용하여 진행 상황을 처리할 수 있습니다.
},
);
작업 제거
활성화 되지 않은 작업 대기 완료되거나 실패한 작업을 모두 제거
import { Queue } from 'bullmq'; // BullMQ의 Queue 클래스를 임포트합니다.
const queue = new Queue('paint'); // 'paint'라는 이름의 큐를 생성합니다.
await queue.drain(); // 큐의 모든 작업이 완료될 때까지 대기하고, 큐가 비워집니다.
기간내의 작업만 유지하고 특정 상태의 작업은 제거
import { Queue } from 'bullmq'; // BullMQ의 Queue 클래스를 임포트합니다.
const queue = new Queue('paint'); // 'paint'라는 이름의 큐를 생성합니다.
// 'paused' 상태의 작업 중에서 1분 이상 대기 중인 작업을 최대 1000개까지 삭제합니다.
const deletedJobIds = await queue.clean(
60000, // 60,000 밀리초 (1분) 이상 대기 중인 작업
1000, // 최대 1000개의 작업을 삭제
'paused', // 삭제할 작업의 상태
);
대기열과 내용 모두 삭제
import { Queue } from 'bullmq'; // BullMQ의 Queue 클래스를 임포트합니다.
const queue = new Queue('paint'); // 'paint'라는 이름의 큐를 생성합니다.
// 큐의 모든 작업과 메타데이터를 삭제합니다.
await queue.obliterate();
만약 사용한다면?!
게임 아이템 구매 예제
큐 생성 및 요청 추가
import { Queue } from 'bullmq';
const gameQueue = new Queue('gameRequests');
// 플레이어의 요청을 큐에 추가하는 함수
async function addGameRequest(playerId, action) {
await gameQueue.add('playerAction', {
playerId,
action,
});
}
// 예시: 플레이어가 아이템을 구매하는 요청 추가
addGameRequest('player123', 'buyItem');
워커 생성
import { Worker } from 'bullmq';
// 게임 요청을 처리하는 워커 생성
const gameWorker = new Worker('gameRequests', async job => {
const { playerId, action } = job.data;
switch (action) {
case 'buyItem':
// 아이템 구매 로직 처리
console.log(`Player ${playerId} is buying an item...`);
// 여기에 아이템 구매 처리 로직 추가
break;
// 추가적인 게임 액션 처리
// case 'levelUp':
// // 레벨 업 로직 처리
// break;
default:
console.log(`Unknown action: ${action}`);
}
});
이벤트 리스너 추가
// 요청이 완료되었을 때의 이벤트 리스너
gameWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed!`);
});
// 요청이 실패했을 때의 이벤트 리스너
gameWorker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed with error: ${err.message}`);
});
전체
import { Queue, Worker } from 'bullmq';
// 큐 생성
const gameQueue = new Queue('gameRequests');
// 플레이어의 요청을 큐에 추가하는 함수
async function addGameRequest(playerId, action) {
await gameQueue.add('playerAction', {
playerId,
action,
});
}
// 워커 생성
const gameWorker = new Worker('gameRequests', async job => {
const { playerId, action } = job.data;
switch (action) {
case 'buyItem':
console.log(`Player ${playerId} is buying an item...`);
// 아이템 구매 처리 로직 추가
break;
default:
console.log(`Unknown action: ${action}`);
}
});
// 이벤트 리스너
gameWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed!`);
});
gameWorker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed with error: ${err.message}`);
});
// 예시: 플레이어가 아이템을 구매하는 요청 추가
addGameRequest('player123', 'buyItem');
주요기능
BullMQ는 부하 개선 및 복잡한 작업의 수행에 최적화된 여러 기능을 제공합니다.
Concurrency
메시지 큐라고 반드시 Job을 한 번에 하나씩만 처리하지는 않습니다.
Concurrency로 여러개의 Job을 병렬로 처리하는
Worker 개수를 조정해 처리 속도를 향상시킬 수 있습니다.
여기서 Concurrency는 하나의 Node.js 인스턴스에서 실행되는 Worker의 개수를 의미합니다.
PM2나 다른 머신에서 Node.js를 실행하면 그만큼 Worker 개수가 늘어납니다.
Rate Limiting
Concurrency를 늘린다면 Job의 처리 속도는 올라갑니다.
그러나 Job이 한 번에 몰리면 CPU에 부하가 발생해 전체 서비스에 영향을 끼칠 수도 있습니다.
모든 Job을 언제나 실시간으로 처리할 필요는 없습니다.
따라서 Rate Limit로 부하를 조정할 수 있습니다.
Rate Limit를 설정하면 Concurrency가 높게 설정되어 있더라도 제한이 걸려 일정 사용량만 쓰게 됩니다.
FIFO / LIFO
BullMQ는 기본적으로 선입선출 방식으로 동작합니다.
만약 Concurrency가 1보다 높게 설정되어있다면
Worker들은 Queue에서 순서대로 Job을 꺼내어 병렬로 처리합니다.
Worker의 처리시간이 다를 수 있으므로 끝나는 시간은 순서를 보장하지 않습니다.
설정에 따라 LIFO 후입선출로 동작하기도 합니다.
Priorities
Queue에 Job을 추가할 때 priority 설정으로 우선순위를 부여할 수 있습니다.
숫자가 낮을수록 더 높은 우선순위를 가집니다.
따로 설정하지 않으면 자동으로 가장 높은 우선순위를 갖게 됩니다.
Delayed Jobs
Queue에 Job을 추가할 때 Worker가 이를 바로 처리하지 않고
대기 시간을 갖도록 할 수 있습니다.
대기 시간은 실시간으로 바뀔 수 있습니다.
사용자의 요청이 1분간 발생하지 않은 다음에 어떠한 이벤트를 처리하려고 한다면 요청이 들어올 때 대기 시간을 변경하는 식으로 이를 만족 할 수 있습니다.
RabbitMQ
![[Pasted image 20250210204502.png]]
토끼와 물소는 큐를 관리하는 곳에 사용하지만
서로 다른 목적을 가지고 다른 방식으로 작동합니다.
토끼는 기본적으로 내구성 있는 큐와 메시지 지속성을 제공하여 시스템을 다시 시작해도 메시지가 손실되지 않습니다.
물소는 지속성을 위해 Redis에 의존하는데 Redis는
내구성 기능을 제공하지만 토끼에 비하면 그정도가 낮습니다.
토끼는 배포 및 관리시 온프레미스 또는 클라우드에 배포할 수 있는 전용 토끼 서버가 필요합니다.
메시지 브로커를 모니터링하고 관리하기 위한 관리 UI가 함께 제공됩니다.
물소는 애플리케이션에 포함하는 라이브러리로 별도의 서버는 필요하지 않지만 Redis 서버는 필요합니다.
토끼는 강력한 메시징 라우팅 및 내구성이 필요한 내구성이 필요한 복잡한 엔터프라이즈 급 애플리케이션에 이상적이고
물소는 애플리케이션에서 보다 간단하고 분산된 작업이나 메시지 처리 시나리오에 적합합니다.
fastq
아주 위험한 녀석을 건드린것 같다.
내가 왜 이것을 한다고 했을까?!
DNA? RNA? 형이 왜 여기서 나와?!
멸망!!!
그리고 다시!
import fastq from 'fastq'; // fastq 패키지를 가져옵니다.
// 비동기 작업을 정의하는 함수입니다.
// data: 처리할 데이터, callback: 작업 완료 시 호출되는 콜백 함수
function task(data, callback) {
console.log(`Processing: ${data}`); // 현재 처리 중인 데이터를 콘솔에 출력합니다.
// 비동기 작업을 시뮬레이션하기 위해 setTimeout을 사용합니다.
setTimeout(() => {
// 작업이 완료되면 콜백 함수를 호출합니다.
// 첫 번째 인자는 에러, 두 번째 인자는 결과입니다.
callback(null, `Done: ${data}`);
}, 1000); // 1초 후에 작업을 완료하는 시뮬레이션
}
// 큐를 생성합니다. task 함수를 비동기 작업으로 사용하고, 동시성 수준을 2로 설정합니다.
const queue = fastq(task, 2);
// 큐에 작업을 추가합니다. 각 작업에 대해 콜백 함수를 제공합니다.
queue.push('Task 1', (err, result) => {
if (err) throw err; // 에러가 발생하면 예외를 던집니다.
console.log(result); // 작업 결과를 콘솔에 출력합니다.
});
queue.push('Task 2', (err, result) => {
if (err) throw err; // 에러 처리
console.log(result); // 작업 결과 출력
});
queue.push('Task 3', (err, result) => {
if (err) throw err; // 에러 처리
console.log(result); // 작업 결과 출력
});
해당 코드를 보면 알 수 있는 부분이
동시성 수준과 비동기 작업을 큐로 생성하여 작업한다
우선 fastq는 메모리 내에서 빠르게 작업을 처리할 수 있는 큐입니다.
성는 벤치마크 100만 개 작업 기준으로 854ms 정도 속도고
제로 오버헤드 연속 함수 호출이 필요하다면 fastseries를 확인하고
제로 오버헤드의 병렬 함수 호출이 필요하다면 fastparallel을 확인하라고 한다.
콜백 api 사용할때
'use strict';
import fastq from 'fastq'; // fastq 패키지를 가져옵니다.
const queue = fastq(worker, 1); // worker 함수와 동시성 1로 큐를 생성합니다.
queue.push(42, function (err, result) {
if (err) { throw err; } // 에러가 발생하면 예외를 던집니다.
console.log('the result is', result); // 작업 결과를 출력합니다.
});
// worker 함수 정의
function worker(arg, cb) {
cb(null, arg * 2); // 입력값을 2배로 만들어 콜백 호출
}
프로미스 api 사용할때
'use strict';
import fastq from 'fastq'; // fastq 패키지를 가져옵니다.
const queue = fastq.promise(worker, 1); // 프로미스 기반 worker 함수와 동시성 1로 큐를 생성합니다.
async function worker(arg) {
return arg * 2; // 입력값을 2배로 반환하는 비동기 함수
}
async function run() {
const result = await queue.push(42); // 큐에 작업 추가 후 결과 대기
console.log('the result is', result); // 작업 결과 출력
}
run(); // run 함수 실행
this 설정할때
'use strict';
import fastq from 'fastq'; // fastq 패키지를 가져옵니다.
const that = { hello: 'world' }; // this로 사용할 객체 정의
const queue = fastq(that, worker, 1); // context와 worker 함수, 동시성 1로 큐를 생성합니다.
queue.push(42, function (err, result) {
if (err) { throw err; } // 에러가 발생하면 예외를 던집니다.
console.log(this); // this로 설정한 객체 출력
console.log('the result is', result); // 작업 결과 출력
});
// worker 함수 정의
function worker(arg, cb) {
console.log(this); // worker 함수 내에서 this 출력
cb(null, arg * 2); // 입력값을 2배로 만들어 콜백 호출
}
fastq는 단일 서버에서 비동기 작업을 효율 적으로 처리해야 할 때 사용합니다.
메모리 내에서만 작업을 처리하므로, 단순한 요청 처리나 파일 i/o 작업에 적합합니다.
간단한 api로 비동기 작업을 큐에 추가하고 처리할 수 있습니다.
동시성 제어 기능제공
메모리 내에서 작업을 처리하기 때문에 서버가 재시작 되면 큐의 모든 작업이 사라집니다.
마지막으로 일시 정지와 재개 도 가능하다는 사실을 알게 되어 추가
import fastq from 'fastq';
function task(data, callback) {
console.log(`Processing: ${data}`);
setTimeout(() => {
callback(null, `Done: ${data}`);
}, 1000);
}
const queue = fastq(task, 2);
queue.push('Task 1');
queue.push('Task 2');
queue.push('Task 3');
// 큐를 일시 정지
queue.pause();
setTimeout(() => {
console.log('Resuming queue...');
queue.resume(); // 큐를 재개
}, 3000);
kafka
카프카는 파이프라인 스트리밍 분석 데이터 통합 및
미션 크리티컬 애플리케이션을 위해 설계된
고성능 분산 이벤트 스트리밍 플랫폼이다.
pub - sub 모델의 메시지 큐 형태로 동작하며
분산환경에 특화되어 있다.
왜 쓸까?
기존 데이터 시스템의 문제점
각 애플리케이션과 DB가 end-to-end로 연결되어 있고
각 파이프 라인이 파편화 되어있다.
시스템 복잡도가 높아지면서 다음과 같은 문제가 발생하게 되었다.
시스템 복잡도가 증가할수록
통합된 전송영역이 없어 데이터 흐름 파악하기 어렵고 관리가 어렵다
특정 부분에서 장애 발생시 조치 시간 증가
HW교체 SW업그레이드 시 관리포인트가 늘어나고 작업시간 증가
데이터 파이프라인 관리의 어려움
각 애플리케이션과 데이터 시스템 간의 별도의 파이프라인이 존재하고
파이프 라인마다 데이터 포맷과 처리 방식이 다름
새로운 파이프라인 확장이 어려워지면서 확장성 및 유연성이 떨어짐
데이터 불일치 가능성이 있어 신뢰도 감소
모든 이벤트 데이터의 흐름을 중앙에서 관리하기 위해 카프카를 사용한다.
카프카는 대량의 데이터를 실시간으로 처리하고 스트리밍 하는데 중점을 둔 분산 메시징 시스템입니다.
자체적으로 분산 시스템을 구축하며 로그 기반의 메시지 큐를 사용합니다.
실시간 데이터 스트리밍
이벤트 기반 아키텍처
로그 수집 및 분석
높은 처리량 및 내결함성
데이터의 지속성 및 내구성
수평적 확장이 용이
그룹을 통한 데이터 처리의 유연성
요약
BullQ
비동기 작업 처리 이메일 전송 이미지 처리 등과 같이 시간이 걸리는 작업을
백그라운드에서 처리할 필요가 있을때
예약 작업 특정 시간에 작업을 실행해야 하는 경우
작업 재시도 실패한 작업을 자동으로 재시도해야 할 필요가 있을 때
우선 순위 작업 작업의 중요도에 따라 우선순위를 부여하고 처리할 경우
FastQ
가벼운 비동기 작업 메모리 내에서 빠르게 처리할 수 있는 간단한 작업을 수행할 때
짧은 작업 큐 데이터의 지속성이 필요 없고 메모리 기반으로 빠른 성능이 필요한 경우
간단한 API 복잡한 설정 없이 신속하게 작업 큐를 구현하고자 할 때
Kafka
실시간 데이터 스트리밍 대량의 데이터를 실시간으로 수집하고 처리할 필요가 있을 때
이벤트 기반 아키텍처 마이크로서비스 간의 이벤트를 전달하고 처리해야 할 경우
로그 수집 및 분석 다양한 소스에서 실시간으로 로그 데이터를 수집하고 분석할 때
내구성과 확장성 필요 데이터의 지속성과 높은 처리량이 요구되는 상황일 경우
'TIL' 카테고리의 다른 글
TIL_2025-03-11_Protoc (0) | 2025.03.11 |
---|---|
TIL_2025-02-17 (0) | 2025.02.17 |
TIL_0205-02-06 (0) | 2025.02.06 |
TIL_2025-02-05_var , let , const (0) | 2025.02.05 |
TIL_2025-02-04_ 컴퓨터 구조 와 운영체제 (1) | 2025.02.04 |
댓글