RabbitMQ چیست؟ استفاده از RabbitMQ در Spring Boot

قبل هرچیز یک نگاه به RabbitMQ بندازیم :)

خب RabbitMQ یک نرم افزار رایج انتقال پیام(messaging broker) بین برنامه ها و سیستم هاست که به آن ها اجازه می دهد به یکدیگر متصل شوند و ارتباط برقرار کنند. این یک روش رایج برای سرویس ها در سیستم های مبتنی بر میکروسرویس(microservices-based systems) است که به‌صورت نامتقارن از طریق ارسال پیام و عملیات صف بندی ارتباط برقرار کنند.

برای اینکه بیشتر درمورد RabbitMQ بدونید و نحوه ی کار اون آشنا بشید یه سر به این مقاله برنید

https://virgool.io/@mabdi/rabbitmq-%DA%86%DB%8C%D8%B3%D8%AA-%D9%88-%DA%86%D9%87-%D8%A7%D8%B3%D8%AA%D9%81%D8%A7%D8%AF%D9%87-%D8%A7%DB%8C-%D8%AF%D8%A7%D8%B1%D8%AF-lkdktial4v3g

یه توضیح کوچیک در باره نحوه ی کار RabbitMQ اینجا بهتون میدم، بعد میریم سراغ Spring Boot

پیام رسانی در RabbitMQ شامل موارد زیر است:

  • producer یک برنامه کاربری است که پیام ها را به RabbitMQ ارسال می کند. پیام ها مستقیماً به یک صف(queue) ارسال نمی شوند.در اینجا، producer پیام را به یک exchange ارسال می کند. exchange مسئولیت مسیریابی پیام به صف های(queues) مختلف را بر عهده دارند
    به زبان ساده producer برنامه ای است که پیام های اولیه را تولید میکنه و به RabbitMQ ارسال می کند پیام ها ابتدا در exchange مشخص میشود که وارد کدام صف شوند
  • صف(queue) یک بافر است که در داخل RabbitMQ قرار دارد تا پیام هایی را که تولیدکننده می فرستد و گیرنده دریافت می کند ذخیره کند.
  • consumer یک برنامه کاربردی کاربر است که پیام ها را از RabbitMQ دریافت می کند و سپس آنها را پردازش می کند.

در تصویر زیر این فرایند را مشاهده می کنید

نصب RabbitMQ

حالا که با RabbitMQ آشنا شدیم وقتشه تا اونو نصب کنیم، البته من برای نصب و استفاده از داکر استفاده میکنم پیشنهاد می کنم شماهم بجای نصب خود نرم افزار از کانتینرهای داکر استفاده کنید.

کار سختی نیست کافیه داکر روی سیستم نصب داشته باشید و دستور زیر را در cmd یا terminal اجرا کنید

docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

حالا اگر داخل مرورگر http://localhost:15672 بزنید محیط مدیریت RabbitMQ براتون باز میشه و میتونید با یوزر guest و پسورد guest وارد شوید. هرچند ما با این محیط فعلا کاری نداریم اما میتونید مفاهیمی که بالا گفتیم اینجا مشاهده کنید.

سورس پروژه

پروژه ای که در ادامه قراره توضیح بدیمو میتونید از لینک زیر دریافت کنید :)

https://github.com/sheikhoo/RabbitMQ-SpringBoot-Example

آماده سازی

ایجاد پروژه و Dependencies

وقتشه بریم سراغ SpringBoot یک پروژه جدید ایجاد کنید، برای استفاده از RabbitMQ نیاز هستش تا کتابخانه spring-boot-starter-amqp را به پروژه خودمون اضافه کنیم. برای اینکار کد زیر را در فایل pom.xml قرار دهید

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

ایجاد Model

یک کلاس POJO که میخواهیم اشیاء آن را به عنوان پیام مبادله (ارسال/دریافت) کنیم ایجاد میکنیم. در اینجا من اسم کلاسمو Message میزارم شما میتونید هر مدلی که دوست دارید بسازید مثلا User، Student، Log یا هرچیز دیگه ای

package ir.sheikhoo.rabbitmqspringboot.model;
public class Message {
    private String title;
    private String text;
    private String sender;
    // ... getter و setter هارو اینجا قرار بدید
}

application.properties

فایل پیکربندی(application.properties) باید به شکل زیر باشد:

server.port= 8080
spring.rabbitmq.host= localhost
spring.rabbitmq.port= 5672
spring.rabbitmq.username= guest
spring.rabbitmq.password= guest
queue.name= example_queue
fanout.exchange= example_exchange

خط اول port که قراره برنامه ما روی اون اجرا بشه رو مشخص کردیم، 4 خط بعد هم اطلاعات RabbitMQ را برای اتصال به اونو به برناممون دادیم.

خط 5 یک اسم برای صف یا همون queue انتخاب کردیم که ما در اینجا اسمشو example_queue گذاشتیم و خط 6 هم یک اسم برای exchange انتخاب کردیم که اینجا ما اونو example_exchange گذاشتیم.

تنظیمات RabbitMQ

یک کلاس جاوا جدید برای RabbitMQ configuration به نام RabbitConfiguration.java ایجاد می کنیم

نکته: انوتیشن @Configuration در بالا کلاس فراموش نکنید

package ir.sheikhoo.rabbitmqspringboot.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfiguration {
    @Value(&quot${fanout.exchange}&quot)
    private String fanoutExchange;

    @Value(&quot${queue.name}&quot)
    private String queueName;

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }

    @Bean
    Binding binding(Queue queue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

بیایید نگاهی دقیق به این کلاس پیکربندی بیندازیم:

انوتیشن @Value حاوی نام queue و exchange است که از application.properties می خواند.

تابع queue() جهت ایجاد یک صف جدید برای ذخیره Messageهای ما بکار میرود.

تابع fanoutExchange() جهت ایجاد یک exchange جدید از نوع FanoutExchange استفاده می شود. FanoutExchange فقط تمام پیام هایی را که دریافت می کند به تمام صف هایی که می شناسد پخش می کند.

تابع binding() صف و exchange را به هم متصل می کند.

ساخت Producer

خب Producer که یادتون هست بالا درموردش صحبت کردیم، حال میخواهیم یک Producer برای ارسال پیام های از جنس Message به صف RabbitMQ ایجاد کنیم اسم این کلاس را QueueProducer قرار می دهیم

ما از درون producer و consumer استفاده میکنیم که ارائه شده توسط چارچوب Spring Boot است.

package ir.sheikhoo.rabbitmqspringboot.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import ir.sheikhoo.rabbitmqspringboot.model.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class QueueProducer {
    @Value(&quot${fanout.exchange}&quot)
    private String fanoutExchange;

    private final RabbitTemplate rabbitTemplate;
    @Autowired
    public QueueProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    public void produce(Message message) throws JsonProcessingException {
        rabbitTemplate.setExchange(fanoutExchange);
        rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(message));
    }
}

تابع convertAndSend() با استفاده از ObjectMapper مدل ما را به JSON تبدیل میکند تا بتوان درون صف آن را ذخیره نماییم.

ساخت Consumer

برای دریافت پیام ها() از صف به Consumer نیاز داریم اسم این کلاس QueueConsumer میزاریم

package ir.sheikhoo.rabbitmqspringboot.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import ir.sheikhoo.rabbitmqspringboot.model.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class QueueConsumer {
    private final RabbitTemplate rabbitTemplate;

    @Value(&quot${queue.name}&quot)
    private String queueName;

    @Autowired
    public QueueConsumer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    private String receiveMessage() {
        String message = (String) rabbitTemplate.receiveAndConvert(queueName);
        return message;
    }

    public Message processMessage() throws JsonProcessingException {
        String message = receiveMessage();
        return new ObjectMapper().readValue(message, Message.class);
    }
}

ساخت کنترلر

با ساخت یک controller و قرار دادن همه بخش های کنار هم میتوانیم از برنامه که نوشتیم استفاده کنیم

package ir.sheikhoo.rabbitmqspringboot.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import ir.sheikhoo.rabbitmqspringboot.config.QueueConsumer;
import ir.sheikhoo.rabbitmqspringboot.config.QueueProducer;
import ir.sheikhoo.rabbitmqspringboot.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;

@Controller
@RequestMapping(&quot/&quot)
public class LogsController {

    @Autowired
    private QueueProducer queueProducer;
    @Autowired
    private QueueConsumer queueConsumer;

    @GetMapping(&quotgetMessage&quot)
    public ResponseEntity<?> getMessage() throws JsonProcessingException {
        Message message = queueConsumer.processMessage();
        return new ResponseEntity<Message>(message, HttpStatus.OK);
    }

    @PostMapping(&quotsendMessage&quot)
    public ResponseEntity<?> sendMessage(@RequestBody Message message) throws
JsonProcessingException {
        queueProducer.produce(message);
        return new ResponseEntity<Message>(HttpStatus.CREATED);
    }
}

بعد از اجرا پروژه با استفاده از postman میتوانید درخواست POST و GET پیام ها را به صف ارسال نمایید یا از صف استخراج کنید

ارسال و قرار دادن پیام به صف:

[POST] http://localhost:8080/sendMessage

Body:

{ 
     &quottitle&quot:&quotHello,RabbitMQ&quot, 
     &quottext&quot:&quotThis is my message&quot,
     &quotsender&quot:&quotadmin&quot 
}

دریافت پیام از صف:

[GET] http://localhost:8080/getMessage