RabbitMQ์˜ Channel

com.rabbitmq.client.Channel ํด๋ž˜์Šค๋Š” RabbitMQ Java Client ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์˜ ํ•ต์‹ฌ ํด๋ž˜์Šค์ด๋‹ค.

์ด ํด๋ž˜์Šค๋Š” AMQP(Advanced Message Queuing Protocol) ์ฑ„๋„์„ ๋‚˜ํƒ€๋‚ด๋ฉฐ, RabbitMQ๋Š” ๋ฉ”์‹œ์ง€ ํ ์‹œ์Šคํ…œ์œผ๋กœ, Channel์€ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜๊ณผ RabbitMQ ์„œ๋ฒ„ ๊ฐ„์˜ ํ†ต์‹ ์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ ์ค‘์š”ํ•˜๋‹ค.

  • ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ <--> RabbitMQ์˜ ๋Œ€๋ถ€๋ถ„์˜ ์ƒํ˜ธ์ž‘์šฉ์ด ์ฑ„๋„์„ ํ†ตํ•ด ์ด๋ฃจ์–ด์ง„๋‹ค.

Channel์˜ ์ฃผ์š” ๊ธฐ๋Šฅ

1. ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰(publishing)

void basicPublish(
    String exchange, 
    String routingKey, 
    AMQP.BasicProperties props, 
    byte[] body)

Channel์„ ์‚ฌ์šฉํ•ด์„œ ๋ฉ”์‹œ์ง€๋ฅผ ํ์— ์ „์†กํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋•Œ ํŠน์ • exchange์™€ routing key๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐœํ–‰ํ•œ๋‹ค.

2. ๋ฉ”์‹œ์ง€ ์†Œ๋น„(consuming)

String basicConsume(
    String queue,
    boolean autoAck,
    Consumer callback)

์ง€์ •๋œ ํ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•œ๋‹ค. autoAck๊ฐ€ true๋ฉด ์ž๋™์œผ๋กœ ๋ฉ”์‹œ์ง€ ์ˆ˜์‹ ์„ ํ™•์ธํ•˜๊ณ , ์†Œ๋น„์ž ํƒœ๊ทธ(cosumer tage)๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ํ•ด๋‹น ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฉ”์‹œ์ง€๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์†Œ๋น„ํ•  ์ˆ˜ ์žˆ๋‹ค.

3. ๋ฉ”์‹œ์ง€ ํ™•์ธ(acknowledgements) ๐ŸŒŸ

๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•œ ํ›„ ์‘๋‹ต์„ ๋ณด๋‚ด๋Š” 3๊ฐ€์ง€ ๋ฐฉ์‹์ด ์กด์žฌํ•œ๋‹ค.

a. basicAck()

ํŠน์ • ๋ฉ”์‹œ์ง€์˜ ์ˆ˜์‹ ์„ ํ™•์ธํ•œ๋‹ค.

  • deliveryTag: ํ™•์ธ(ACK)ํ•  ๋ฉ”์‹œ์ง€์˜ ๊ณ ์œ  ํƒœ๊ทธ

  • multiple:

    • true โ†’ ํ˜„์žฌ deliveryTag๊นŒ์ง€ ์†Œ๋น„์ž๊ฐ€ ๋ฐ›์€ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ํ•œ๊บผ๋ฒˆ์— ACK ์ฒ˜๋ฆฌ

    • false โ†’ ํ•ด๋‹น deliveryTag ๋ฉ”์‹œ์ง€๋งŒ ACK ์ฒ˜๋ฆฌ

b. basicNack()

๋ฉ”์‹œ์ง€์˜ ์ˆ˜์‹ ์„ ๊ฑฐ๋ถ€ํ•˜๋ฉฐ, ๋ฆฌํ์ž‰ ์—ฌ๋ถ€๋ฅผ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • deliveryTag: ๋ฉ”์‹œ์ง€ ๊ณ ์œ ์˜ ํƒœ๊ทธ (์ˆซ์ž 1, 2..)

  • multiple:

    • true: ํ˜„์žฌ deliveryTag๊นŒ์ง€ ์†Œ๋น„์ž๊ฐ€ ๋ฐ›์€ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ํ•œ๊บผ๋ฒˆ์— Nackํ•˜๋Š” ๊ฒƒ

    • false: ํ˜„์žฌ ๋ฉ”์‹œ์ง€๋งŒ ack ์ฒ˜๋ฆฌ

  • requeue

    • true: ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค์‹œ ํ์— ๋„ฃ์Œ (์žฌ์‹œ๋„ ๊ฐ€๋Šฅ)

    • false: ๋ฉ”์‹œ์ง€๋ฅผ ๋ฒ„๋ฆฌ๊ฑฐ๋‚˜ DLX๋กœ ๋ณด๋ƒ„

c. basicReject()

basicNack๊ณผ ์œ ์‚ฌํ•˜์ง€๋งŒ ๋‹จ์ผ ๋ฉ”์‹œ์ง€์˜ ์ˆ˜์‹ ์„ ๊ฑฐ๋ถ€ํ•œ๋‹ค.

  • deliveryTag: ๋ฉ”์‹œ์ง€ ๊ณ ์œ ์˜ ํƒœ๊ทธ

  • requeue:

    • true: ๋‹ค์‹œ queue์— ๋„ฃ์Œ

    • false: ๋ฉ”์‹œ์ง€๋ฅผ ๋ฒ„๋ฆฌ๊ฑฐ๋‚˜ DLX๋กœ ๋ณด๋ƒ„

4. ํ ๊ด€๋ฆฌ(queue management)

a. Qeueu ์„ ์–ธ

ํ๋ฅผ ์„ ์–ธํ•˜๋Š” ๋ฉ”์†Œ๋“œ์ด๋‹ค. ๋งŒ์•ฝ ํ๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š์œผ๋ฉด ์ƒ์„ฑํ•œ๋‹ค.

ํ์˜ ์ด๋ฆ„(name), ์ง€์†์„ฑ(durable), ๋…์ ์ (exclusive), ์ž๋™ ์‚ญ์ œ(autoDelete) ์—ฌ๋ถ€์™€ ๊ฐ™์€ ์†์„ฑ์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • AMQP.Queue.DeclareOk์™€ ๊ฐ™์€ ํด๋ž˜์Šค๋Š” RabbitMQ Client ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๋‚ด์—์„œ AMQP ํ”„๋กœํ† ์ฝœ์˜ ๊ทœ๊ฒฉ์„ ๋”ฐ๋ฅด๋Š” ํด๋ž˜์Šค๋“ค ์ค‘ ํ•˜๋‚˜์ด๋‹ค.

arguments์—๋Š” ํ์˜ ์†์„ฑ์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. ๋งŒ์•ฝ ํ์— ttl์„ ์„ค์ •ํ•˜๋ฉด ๋ฉ”์‹œ์ง€์˜ ๋งŒ๋ฃŒ ์‹œ๊ฐ„์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

b. Queue ์‚ญ์ œ

์ง€์ •ํ•œ ์ด๋ฆ„์˜ ํ๋ฅผ ์‚ญ์ œํ•œ๋‹ค.

5. ์ต์Šค์ฒด์ธ์ง€ ๊ด€๋ฆฌ(exchange mangement)

exchange๋Š” ๋ฉ”์‹œ์ง€์˜ ๋ผ์šฐํŒ…์„ ๋‹ด๋‹นํ•˜๋Š” ์—ญํ• ์„ ํ•˜๋ฉฐ, ํ์™€ ์ง์ ‘์ ์œผ๋กœ ์—ฐ๊ฒฐ๋˜์–ด ๋ฉ”์‹œ์ง€๋ฅผ ํ์— ์ „๋‹ฌํ•œ๋‹ค.

  • exchange์˜ ์ด๋ฆ„

  • type: direct, fanout, topic, headers ๋“ฑ

  • durable: ์„œ๋ฒ„ ์žฌ์‹œ์ž‘ ์‹œ์—๋„ ์ต์Šค์ฒด์ธ์ง€๊ฐ€ ์œ ์ง€๋ ์ง€ ์—ฌ๋ถ€๋ฅผ ๊ฒฐ์ •

6. ๋ฐ”์ธ๋”ฉ ๊ด€๋ฆฌ(binding management)

queueBind๋Š” ํ์™€ ์ต์Šค์ฒด์ธ์ง€๋ฅผ ๋ฐ”์ธ๋”ฉํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. ๋ฐ”์ธ๋”ฉ์ด ์ด๋ฃจ์–ด์ ธ์•ผ ๋ฉ”์‹œ์ง€๊ฐ€ ์ต์Šค์ฒด์ธ์ง€์—์„œ ํ๋กœ ๋ผ์šฐํŒ…๋  ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค.

  • queue: ๋ฐ”์ธ๋”ฉํ•  ํ์˜ ์ด๋ฆ„.

  • exchange: ํ๋ฅผ ๋ฐ”์ธ๋”ฉํ•  ์ต์Šค์ฒด์ธ์ง€์˜ ์ด๋ฆ„.

  • routingKey: ๋ผ์šฐํŒ… ํ‚ค (ํŠนํžˆ direct, topic ์ต์Šค์ฒด์ธ์ง€ ํƒ€์ž…์—์„œ ์‚ฌ์šฉ). fanout ํƒ€์ž…์˜ ์ต์Šค์ฒด์ธ์ง€์—์„œ๋Š” ์ด ๊ฐ’์„ ๋ฌด์‹œํ•œ๋‹ค.

์„ค์ •์„ ํ†ตํ•ด ํ์™€ ์ต์Šค์ฒด์ธ์ง€๋ฅผ ์—ฐ๊ฒฐํ•˜๊ณ  ๋ผ์šฐํŒ… ๊ทœ์น™์— ๋งž๋Š” ๋ฉ”์‹œ์ง€๊ฐ€ ํ•ด๋‹น ํ๋กœ ์ „๋‹ฌ๋˜๋„๋ก ํ•œ๋‹ค.

7. ํ๋ฆ„ ์ œ์–ด(flow control)

์ด๋Š” RabbitMQ์˜ QoS(Quality of Service) ์„ค์ •์„ ์œ„ํ•œ ๋ฉ”์„œ๋“œ๋กœ, ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ๊ฐ€ ํ•œ ๋ฒˆ์— ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์‹œ์ง€ ์ˆ˜๋ฅผ ์ œํ•œํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. ์ฆ‰, consumer๊ฐ€ ํ•œ ๋ฒˆ์— ๋ฐ›์„ ์ˆ˜ ์žˆ๋Š” ๋ฉ”์‹œ์ง€ ์ˆ˜๋ฅผ ์ œํ•œํ•œ๋‹ค.

์ด๋ฅผ ํ†ตํ•ด์„œ ์‹œ์Šคํ…œ์ด ๊ณผ๋ถ€ํ™” ๋˜๋Š” ๊ฒƒ์„ ๋ฐฉ์ง€ํ•˜๊ณ , ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ๋ฅผ ํšจ์œจ์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜๋Š” ๊ฒƒ์„ ๋ชฉํ‘œ๋กœ ํ•œ๋‹ค.

  • prefetchCount: ํ•œ ๋ฒˆ์— ์†Œ๋น„ํ•  ์ˆ˜ ์žˆ๋Š” ์ตœ๋Œ€ ๋ฉ”์‹œ์ง€ ์ˆ˜๋ฅผ ์ง€์ •ํ•œ๋‹ค. 'chunk'๋ผ๊ณ  ์ƒ๊ฐํ•˜๋ฉด ๋œ๋‹ค.

์ด ๊ฐ’์„ ์„ค์ •ํ•˜๋ฉด RabbitMQ ์„œ๋ฒ„์—์„œ๋Š” ํ•ด๋‹น ์ˆ˜ ๋งŒํผ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ํ•œ ๋ฒˆ์— ์ปจ์Šˆ๋จธ์—๊ฒŒ ์ „๋‹ฌํ•˜๊ณ , ์ปจ์Šˆ๋จธ๊ฐ€ ์‘๋‹ต์„ ๋ณด๋‚ด๊ธฐ ์ „๊นŒ์ง€๋Š” ์ถ”๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌํ•˜์ง€ ์•Š๋Š”๋‹ค.

์‚ฌ์šฉ ์˜ˆ์ œ

์˜ˆ์ œ์˜ Spring AMQP์—์„œ๋Š” ์ €์ˆ˜์ค€ API๋ฅผ ๊ฐ์‹ธ๊ณ  ์žˆ์ง€๋งŒ, @RabbitListenr์˜ ackMode = "MANUAL" ์„ค์ •์€ Channel ๊ฐ์ฒด๋ฅผ ์ง์ ‘ ๋‹ค๋ฃฐ ํ•„์š”๊ฐ€ ์žˆ๋‹ค.

๋‹ค์Œ์€ Spring AMQP์—์„œ Channel ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์˜ˆ์ œ์ด๋‹ค.

Channel ํด๋ž˜์Šค๋Š” RabbitMQ์™€์˜ ํ†ต์‹ ์„ ์œ„ํ•œ ํ•ต์‹ฌ ์ธํ„ฐํŽ˜์ด์Šค์ด๋ฉฐ, ๋ฉ”์‹œ์ง€์˜ ๋ฐœํ–‰, ์†Œ๋น„, ํ™•์ธ ๋ฐ ํ ๊ด€๋ฆฌ์™€ ๊ฐ™์€ ๋ชจ๋“  ์ค‘์š”ํ•œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

Last updated