久久久久久久视色,久久电影免费精品,中文亚洲欧美乱码在线观看,在线免费播放AV片

<center id="vfaef"><input id="vfaef"><table id="vfaef"></table></input></center>

    <p id="vfaef"><kbd id="vfaef"></kbd></p>

    
    
    <pre id="vfaef"><u id="vfaef"></u></pre>

      <thead id="vfaef"><input id="vfaef"></input></thead>

    1. 站長資訊網
      最全最豐富的資訊網站

      springboot + rabbitmq如何用了消息確認

      springboot + rabbitmq如何用了消息確認

      最近部門號召大伙多組織一些技術分享會,說是要活躍公司的技術氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術交流還是很有助于個人成長的。

      于是乎我主動報名參加了分享,咳咳咳~ ,真的不是為了那點KPI,就是想和大伙一起學習學習!

      springboot + rabbitmq如何用了消息確認

      相關學習推薦:Java視頻教程

      這次我分享的是 springboot + rabbitmq 如何實現(xiàn)消息確認機制,以及在實際開發(fā)中的一點踩坑經驗,其實整體的內容比較簡單,有時候事情就是這么神奇,越是簡單的東西就越容易出錯。

      可以看到使用了 RabbitMQ 以后,我們的業(yè)務鏈路明顯變長了,雖然做到了系統(tǒng)間的解耦,但可能造成消息丟失的場景也增加了。例如:

      • 消息生產者 – > rabbitmq服務器(消息發(fā)送失敗)

      • rabbitmq服務器自身故障導致消息丟失

      • 消息消費者 – > rabbitmq服務(消費消息失?。?/p>

      springboot + rabbitmq如何用了消息確認
      所以說能不使用中間件就盡量不要用,如果為了用而用只會徒增煩惱。開啟消息確認機制以后,盡管很大程度上保證了消息的準確送達,但由于頻繁的確認交互,rabbitmq 整體效率變低,吞吐量下降嚴重,不是非常重要的消息真心不建議你用消息確認機制。


      下邊我們先來實現(xiàn)springboot + rabbitmq消息確認機制,再對遇到的問題做具體分析。

      一、準備環(huán)境

      1、引入 rabbitmq 依賴包

      <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId></dependency>

      2、修改 application.properties 配置

      配置中需要開啟 發(fā)送端消費端 的消息確認。

      spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guest spring.rabbitmq.password=guest  # 發(fā)送者開啟 confirm 確認機制 spring.rabbitmq.publisher-confirms=true# 發(fā)送者開啟 return 確認機制 spring.rabbitmq.publisher-returns=true#################################################### # 設置消費端手動 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支持重試 spring.rabbitmq.listener.simple.retry.enabled=true

      3、定義 Exchange 和 Queue

      定義交換機 confirmTestExchange 和隊列 confirm_test_queue ,并將隊列綁定在交換機上。

      @Configurationpublic class QueueConfig {      @Bean(name = "confirmTestQueue")     public Queue confirmTestQueue() {         return new Queue("confirm_test_queue", true, false, false);     }      @Bean(name = "confirmTestExchange")     public FanoutExchange confirmTestExchange() {         return new FanoutExchange("confirmTestExchange");     }      @Bean    public Binding confirmTestFanoutExchangeAndQueue(             @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,             @Qualifier("confirmTestQueue") Queue confirmTestQueue) {         return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);     }}

      rabbitmq 的消息確認分為兩部分:發(fā)送消息確認 和 消息接收確認。

      springboot + rabbitmq如何用了消息確認

      二、消息發(fā)送確認

      發(fā)送消息確認:用來確認生產者 producer 將消息發(fā)送到 broker ,broker 上的交換機 exchange 再投遞給隊列 queue的過程中,消息是否成功投遞。

      消息從 producerrabbitmq broker有一個 confirmCallback 確認模式。

      消息從 exchangequeue 投遞失敗有一個 returnCallback 退回模式。

      我們可以利用這兩個Callback來確保消的100%送達。

      1、 ConfirmCallback確認模式

      消息只要被 rabbitmq broker 接收到就會觸發(fā) confirmCallback 回調 。

      @Slf4j @Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {      @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {          if (!ack) {             log.error("消息發(fā)送異常!");         } else {             log.info("發(fā)送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);         }     }}

      實現(xiàn)接口 ConfirmCallback ,重寫其confirm()方法,方法內有三個參數(shù)correlationDataack、cause。

      • correlationData:對象內部只有一個 id 屬性,用來表示當前消息的唯一性。
      • ack:消息投遞到broker 的狀態(tài),true表示成功。
      • cause:表示投遞失敗的原因。

      但消息被 broker 接收到只能表示已經到達 MQ服務器,并不能保證消息一定會被投遞到目標 queue 里。所以接下來需要用到 returnCallback

      2、 ReturnCallback 退回模式

      如果消息未能投遞到目標 queue 里將觸發(fā)回調 returnCallback ,一旦向 queue 投遞消息未成功,這里一般會記錄下當前消息的詳細投遞數(shù)據,方便后續(xù)做重發(fā)或者補償?shù)炔僮鳌?/p>

      @Slf4j @Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback {      @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {         log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);     }}

      實現(xiàn)接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個參數(shù)message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey(隊列)。

      下邊是具體的消息發(fā)送,在rabbitTemplate中設置 ConfirmReturn 回調,我們通過setDeliveryMode()對消息做持久化處理,為了后續(xù)測試創(chuàng)建一個 CorrelationData對象,添加一個id10000000000

      @Autowired    private RabbitTemplate rabbitTemplate;      @Autowired    private ConfirmCallbackService confirmCallbackService;      @Autowired    private ReturnCallbackService returnCallbackService;      public void sendMessage(String exchange, String routingKey, Object msg) {          /**          * 確保消息發(fā)送失敗后可以重新返回到隊列中          * 注意:yml需要配置 publisher-returns: true          */         rabbitTemplate.setMandatory(true);          /**          * 消費者確認收到消息后,手動ack回執(zhí)回調處理          */         rabbitTemplate.setConfirmCallback(confirmCallbackService);          /**          * 消息投遞到隊列失敗回調處理          */         rabbitTemplate.setReturnCallback(returnCallbackService);          /**          * 發(fā)送消息          */         rabbitTemplate.convertAndSend(exchange, routingKey, msg,                 message -> {                     message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);                     return message;                 },                 new CorrelationData(UUID.randomUUID().toString()));     }

      三、消息接收確認

      消息接收確認要比消息發(fā)送確認簡單一點,因為只有一個消息回執(zhí)(ack)的過程。使用@RabbitHandler注解標注的方法要增加 channel(信道)、message 兩個參數(shù)。

      @Slf4j @Component @RabbitListener(queues = "confirm_test_queue")public class ReceiverMessage1 {      @RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {          try {             log.info("小富收到消息:{}", msg);              //TODO 具體業(yè)務              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);          }  catch (Exception e) {              if (message.getMessageProperties().getRedelivered()) {                  log.error("消息已重復處理失敗,拒絕再次接收...");                  channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息             } else {                  log.error("消息即將再次返回隊列處理...");                  channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);              }         }     }}

      消費消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。

      1、basicAck

      basicAck:表示成功確認,使用此回執(zhí)方法后,消息會被rabbitmq broker 刪除。

      void basicAck(long deliveryTag, boolean multiple)

      deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行ack、nackreject等操作。

      multiple:是否批量確認,值為 true 則會一次性 ack所有小于當前消息 deliveryTag 的消息。

      舉個栗子: 假設我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發(fā)第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。

      2、basicNack

      basicNack :表示失敗確認,一般在消費消息業(yè)務異常時用到此方法,可以將消息重新投遞入隊列。

      void basicNack(long deliveryTag, boolean multiple, boolean requeue)

      deliveryTag:表示消息投遞序號。

      multiple:是否批量確認。

      requeue:值為 true 消息將重新入隊列。

      3、basicReject

      basicReject:拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似。

      void basicReject(long deliveryTag, boolean requeue)

      deliveryTag:表示消息投遞序號。

      requeue:值為 true 消息將重新入隊列。

      四、測試

      發(fā)送消息測試一下消息確認機制是否生效,從執(zhí)行結果上看發(fā)送者發(fā)消息后成功回調,消費端成功的消費了消息。
      springboot + rabbitmq如何用了消息確認
      用抓包工具Wireshark 觀察一下rabbitmq amqp協(xié)議交互的變化,也多了 ack 的過程。
      springboot + rabbitmq如何用了消息確認

      五、踩坑日志

      1、不消息確認

      這是一個非常沒技術含量的坑,但卻是非常容易犯錯的地方。

      開啟消息確認機制,消費消息別忘了channel.basicAck,否則消息會一直存在,導致重復消費。
      springboot + rabbitmq如何用了消息確認

      2、消息無限投遞

      在我最開始接觸消息確認機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業(yè)務邏輯后確認消息, int a = 1 / 0 發(fā)生異常后將消息重新投入隊列。

      @RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {          try {             log.info("消費者 2 號收到:{}", msg);              int a = 1 / 0;              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);          } catch (Exception e) {              channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);         }     }

      但是有個問題是,業(yè)務代碼一旦出現(xiàn) bug 99.9%的情況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執(zhí)行,導致了死循環(huán)。

      springboot + rabbitmq如何用了消息確認

      本地的CPU被瞬間打滿了,大家可以想象一下當時在生產環(huán)境導致服務死機,我是有多慌。

      springboot + rabbitmq如何用了消息確認
      而且rabbitmq management 只有一條未被確認的消息。

      springboot + rabbitmq如何用了消息確認

      經過測試分析發(fā)現(xiàn),當消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。

      消費者會立刻消費這條消息,業(yè)務處理再拋出異常,消息再重新入隊,如此反復進行。導致消息隊列處理出現(xiàn)阻塞,導致正常消息也無法運行。

      而我們當時的解決方案是,先將消息進行應答,此時消息隊列會刪除該條消息,同時我們再次發(fā)送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業(yè)務的進行。

      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 重新發(fā)送消息到隊尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),                     message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,                     JSON.toJSONBytes(msg));

      但這種方法并沒有解決根本問題,錯誤消息還是會時不時報錯,后面優(yōu)化設置了消息重試次數(shù),達到了重試上限以后,手動確認,隊列刪除此消息,并將消息持久化入MySQL并推送報警,進行人工處理和定時任務做補償。

      3、重復消費

      如何保證 MQ 的消費是冪等性,這個需要根據具體業(yè)務而定,可以借助MySQL、或者redis 將消息持久化,通過再消息中的唯一性屬性校驗。

      贊(0)
      分享到: 更多 (0)
      網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號