<p id="g32nn"></p>
    1. <acronym id="g32nn"><strong id="g32nn"></strong></acronym>
      <pre id="g32nn"></pre>

      <table id="g32nn"><option id="g32nn"></option></table>

          KAFKA SpringBoot2 Nacos 消息異步發送和消費消息(進階篇)
          2022-09-06 22:42:16


          文章目錄

          一、基礎集成
          1. 技術選型

          軟件/框架

          版本

          jdk

          1.8.0_202

          springboot

          2.5.4

          kafka server

          kafka_2.12-2.8.0

          kafka client

          2.7.1

          zookeeper

          3.7.0

          2. 導入依賴
          <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
          </dependency>
          <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
          </dependency>
          3. kafka配置

          properties版本

          spring.application.name=springboot-kafka
          server.port=8080
          # kafka 配置
          spring.kafka.bootstrap-servers=node1:9092

          # producer 配置
          spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
          spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
          # 生產者每個批次最多方多少條記錄
          spring.kafka.producer.batch-size=16384
          # 生產者一端總的可用緩沖區大小,此處設置為32M * 1024 * 1024
          spring.kafka.producer.buffer-memory=33544432

          # consumer 配置
          spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
          spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
          spring.kafka.consumer.group-id=springboot-consumer-02
          # earliest - 如果找不到當前消費者的有效偏移量,則自動重置向到最開始
          spring.kafka.consumer.auto-offset-reset=earliest
          # 消費者的偏移量是自動提交還是手動提交,此處自動提交偏移量
          spring.kafka.consumer.enable-auto-commit=true
          # 消費者偏移量自動提交時間間隔
          spring.kafka.consumer.auto-commit-interval=1000

          yml版本項目內部配置

          server:
          port: 8002
          spring:
          application:
          # 應用名稱
          name: ly-kafka
          profiles:
          # 環境配置
          active: dev
          cloud:
          nacos:
          discovery:
          # 服務注冊地址
          server-addr: nacos.server.com:8848
          config:
          # 配置中心地址
          server-addr: nacos.server.com:8848
          # 配置文件格式
          file-extension: yml
          # 共享配置
          shared-configs:
          - application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

          nacos-config 服務端配置

          在這里插入代碼片
          4. auto-offset-reset 簡述

          關于
          auto.offset.reset 配置有3個值可以設置,分別如下:

          earliest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset時,從頭開始消費;
          latest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據;
          none: topic 各分區都存在已提交的 offset 時,從 offset 后開始消費;只要有一個分區不存在已提交的 offset,則拋出異常;
          默認建議用 earliest, 設置該參數后 kafka出錯后重啟,找到未消費的offset可以繼續消費。

          而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啟kafka,這個設置會從最新的offset開始消費, 中間出問題的哪些就不管了。

          none 這個設置沒有用過,兼容性太差,經常出問題。

          5. 新增一個訂單類

          模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其他服務消費

          package com.gblfy.kafka.entity;

          import lombok.AllArgsConstructor;
          import lombok.Builder;
          import lombok.Data;
          import lombok.NoArgsConstructor;

          import java.time.LocalDateTime;

          @Data
          @Builder
          @AllArgsConstructor
          @NoArgsConstructor
          public class Order {
          /**
          * 訂單id
          */
          private long orderId;
          /**
          * 訂單號
          */
          private String orderNum;
          /**
          * 訂單創建時間
          */
          private LocalDateTime createTime;
          }
          6. 生產者(異步)
          package com.gblfy.lykafka.provider;

          import com.alibaba.fastjson.JSONObject;
          import com.gblfy.common.constant.KafkaTopicConstants;
          import com.gblfy.common.entity.Order;
          import org.apache.kafka.clients.producer.RecordMetadata;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.kafka.core.KafkaTemplate;
          import org.springframework.kafka.support.SendResult;
          import org.springframework.stereotype.Service;
          import org.springframework.util.concurrent.ListenableFuture;
          import org.springframework.util.concurrent.ListenableFutureCallback;

          import java.time.LocalDateTime;

          /**
          * Kafka生產者
          *
          * @author gblfy
          * @date 2021-09-28
          */
          @Service
          public class KafkaProvider {
          private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);

          @Autowired
          private KafkaTemplate<String, String> kafkaTemplate;

          public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
          // 構建一個訂單類
          Order order = Order.builder()
          .orderId(orderId)
          .orderNum(orderNum)
          .createTime(createTime)
          .build();
          // 發送消息,訂單類的 json 作為消息體
          ListenableFuture<SendResult<String, String>> future =
          kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));

          // 監聽回調
          future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
          @Override
          public void onFailure(Throwable e) {
          log.info("發送消息失敗: {}", e.getMessage());
          }

          @Override
          public void onSuccess(SendResult<String, String> result) {
          RecordMetadata metadata = result.getRecordMetadata();
          log.info("發送的主題:{} ,發送的分區:{} ,發送的偏移量:{} ",
          metadata.topic(), metadata.partition(), metadata.offset());
          }
          });
          }
          }
          7. 消費者
          package com.gblfy.lykafka.controller;

          import com.gblfy.lykafka.provider.KafkaProvider;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.web.bind.annotation.GetMapping;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RestController;

          import java.time.LocalDateTime;

          @RestController
          @RequestMapping("/kafka")
          public class KafkaProviderController {

          @Autowired
          private KafkaProvider kafkaProvider;

          @GetMapping("/sendMQ")
          public String sendMQContent() {
          kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());
          return "OK";
          }
          }

          通過 @KafkaListener注解,我們可以指定需要監聽的 topic 以及 groupId, 注意,這里的 topics 是個數組,意味著我們可以指定多個 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。

          注意:消息發布者的 TOPIC 需要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。

          8. kafka配置類
          package com.gblfy.common.constant;

          public class KafkaTopicConstants {
          //kafka發送消息主題
          public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";

          // kafka消費者組需要和yml文件中的 kafka.consumer.group-id的值保持一致
          public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";
          }
          9.單元測試

          新建單元測試,功能測試消息發布,以及消費。

          package com.gblfy.kafka;

          import com.gblfy.kafka.controller.KafkaProvider;
          import org.junit.jupiter.api.Test;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.boot.test.context.SpringBootTest;

          import java.time.LocalDateTime;
          import java.util.UUID;
          import java.util.concurrent.TimeUnit;

          @SpringBootTest
          class KafkaSpringbootApplicationTests {

          @Autowired
          private KafkaProvider kafkaProvider;

          @Test
          public void sendMessage() throws InterruptedException {
          // 發送 1000 個消息
          for (int i = 0; i < 1000; i++) {
          long orderId = i+1;
          String orderNum = UUID.randomUUID().toString();
          kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
          }
          TimeUnit.MINUTES.sleep(1);
          }
          }
          9. 效果圖

          KAFKA SpringBoot2 Nacos 消息異步發送和消費消息(進階篇)_java


          KAFKA SpringBoot2 Nacos 消息異步發送和消費消息(進階篇)_ide_02

          10. 源碼地址

          ??https://gitee.com/gb_90/kafka-parent??

          11.微服務專欄

          ??https://gitee.com/gb_90/micro-service-parent??


          本文摘自 :https://blog.51cto.com/g


          更多科技新聞 ......

          97久久久久人妻精品专区_国产成人精品视频导航_国产色诱视频在线播放网站_97午夜理论电影影院
          <p id="g32nn"></p>
          1. <acronym id="g32nn"><strong id="g32nn"></strong></acronym>
            <pre id="g32nn"></pre>

            <table id="g32nn"><option id="g32nn"></option></table>