使用kafka实现秒杀消息队列

首先导入依赖

<!--        springBoot整合kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

<!--        kafka客户端操作-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

是kafka的配置

application.yml文件中

spring:
#        配置kafka
  kafka:
    listener:
      ack-mode: manual_immediate
      concurrency: 5
      poll-timeout: 5000
      missing-topics-fatal: false

    bootstrap-servers: 服务器IP:端口号

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #生产者每个批次最多放多少条记录
      batch-size: 16384
      #消费者从生产者拿消息总的缓冲区的大小,这里设置为32MB
      buffer-memory: 33554432
#      消息发送重试次数
      retries: 1

KafkaConfig类

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.producer.buffer-memory}")
    private Integer bufferMemory;
    @Value("${spring.kafka.producer.batch-size}")
    private Integer batchSize;
    @Value("${spring.kafka.bootstrap-servers}")
    private String producerServers;

    //生产配置信息
    @Bean
    public Map<String,Object> producerConfigs(){
        Map<String,Object> props = new Hashtable<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,producerServers);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        return props;
    }


    //生产者工厂
    @Bean
    public ProducerFactory<String,Object> producerFactory(){
        return new DefaultKafkaProducerFactory<String,Object>(producerConfigs());
    }


    //生产者模板
    @Bean
    public KafkaTemplate<String,Object> kafkaTemplate(){
        return new KafkaTemplate<String,Object>(producerFactory());
    }
}

手动指定topic


import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopic {

    @Bean
    public NewTopic batchTopic() {
        return new NewTopic("testTopic",8,(short)1);
    }
}

主题名字为"testTopic",主题中一共分为8个分区,副本因子为1,表示包含leader副本在内,所有的副本个数是1

生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Sender {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;


    public void send(String data) {
        log.info("发送消息:"+data);
        kafkaTemplate.send("testTopic","1",data);
    }

}

往testTopic主题中的1分区发送data

调用生产者

 	@RequestMapping(value = "/{path}/doSeckill",method = RequestMethod.POST)
    @ResponseBody
//    PathVariable可以将URL中占位符参数绑定到控制器处理方法的入参中
    public RespBean doSeckill(@PathVariable String path, User user, Long goodsId) {
        SeckillMessage seckillMessage = new SeckillMessage(user, goodsId);
//        将商品信息和用户信息发送给kafka
        sender.send(JsonUtil.object2JsonStr(seckillMessage));
        return RespBean.success(0);

    }

消费者


import com.xxxx.seckill.pojo.SeckillMessage;
import com.xxxx.seckill.pojo.SeckillOrder;
import com.xxxx.seckill.pojo.User;
import com.xxxx.seckill.service.IGoodsService;
import com.xxxx.seckill.service.IOrderService;
import com.xxxx.seckill.utils.JsonUtil;
import com.xxxx.seckill.vo.GoodsVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Receiver {


    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private IGoodsService goodsService;

    @Autowired
    private IOrderService orderService;

    @KafkaListener(topics = "testTopic")
    public void receive(String message) {
        log.info("接收消息:"+message);
        SeckillMessage seckillMessage= JsonUtil.jsonStr2Object(message,SeckillMessage.class);
        Long goodId=seckillMessage.getGoodsId();
        User user=seckillMessage.getUser();
        GoodsVo goodsVo = goodsService.findGoodsVoByGoodsId(goodId);
//        库存不够的话直接返回
        if(goodsVo.getStockCount()<1) {
            return ;
        }
//        从redis中拿订单信息
        SeckillOrder seckillOrder=(SeckillOrder)redisTemplate.opsForValue().get("order:"+user.getId()+":"+goodId);
//     如果有,直接返回
        if(seckillOrder!=null)
            return ;


//        然后进行秒杀操作并创建订单信息
        orderService.seckill(user,goodsVo);

    }
}

消费者监听到testTopic中有数据时,就创建订单信息


版权声明:本文为weixin_51656756原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>