使用kafka实现秒杀消息队列
首先导入依赖
<!-- springBoot整合kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- kafka客户端操作-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
是kafka的配置
application.yml文件中
spring:
# 配置kafka
kafka:
listener:
ack-mode: manual_immediate
concurrency: 5
poll-timeout: 5000
missing-topics-fatal: false
bootstrap-servers: 服务器IP:端口号
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#生产者每个批次最多放多少条记录
batch-size: 16384
#消费者从生产者拿消息总的缓冲区的大小,这里设置为32MB
buffer-memory: 33554432
# 消息发送重试次数
retries: 1
KafkaConfig类
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.bootstrap-servers}")
private String producerServers;
//生产配置信息
@Bean
public Map<String,Object> producerConfigs(){
Map<String,Object> props = new Hashtable<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,producerServers);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return props;
}
//生产者工厂
@Bean
public ProducerFactory<String,Object> producerFactory(){
return new DefaultKafkaProducerFactory<String,Object>(producerConfigs());
}
//生产者模板
@Bean
public KafkaTemplate<String,Object> kafkaTemplate(){
return new KafkaTemplate<String,Object>(producerFactory());
}
}
手动指定topic
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopic {
@Bean
public NewTopic batchTopic() {
return new NewTopic("testTopic",8,(short)1);
}
}
主题名字为"testTopic",主题中一共分为8个分区,副本因子为1,表示包含leader副本在内,所有的副本个数是1
生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Sender {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
public void send(String data) {
log.info("发送消息:"+data);
kafkaTemplate.send("testTopic","1",data);
}
}
往testTopic主题中的1分区发送data
调用生产者
@RequestMapping(value = "/{path}/doSeckill",method = RequestMethod.POST)
@ResponseBody
// PathVariable可以将URL中占位符参数绑定到控制器处理方法的入参中
public RespBean doSeckill(@PathVariable String path, User user, Long goodsId) {
SeckillMessage seckillMessage = new SeckillMessage(user, goodsId);
// 将商品信息和用户信息发送给kafka
sender.send(JsonUtil.object2JsonStr(seckillMessage));
return RespBean.success(0);
}
消费者
import com.xxxx.seckill.pojo.SeckillMessage;
import com.xxxx.seckill.pojo.SeckillOrder;
import com.xxxx.seckill.pojo.User;
import com.xxxx.seckill.service.IGoodsService;
import com.xxxx.seckill.service.IOrderService;
import com.xxxx.seckill.utils.JsonUtil;
import com.xxxx.seckill.vo.GoodsVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Receiver {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private IGoodsService goodsService;
@Autowired
private IOrderService orderService;
@KafkaListener(topics = "testTopic")
public void receive(String message) {
log.info("接收消息:"+message);
SeckillMessage seckillMessage= JsonUtil.jsonStr2Object(message,SeckillMessage.class);
Long goodId=seckillMessage.getGoodsId();
User user=seckillMessage.getUser();
GoodsVo goodsVo = goodsService.findGoodsVoByGoodsId(goodId);
// 库存不够的话直接返回
if(goodsVo.getStockCount()<1) {
return ;
}
// 从redis中拿订单信息
SeckillOrder seckillOrder=(SeckillOrder)redisTemplate.opsForValue().get("order:"+user.getId()+":"+goodId);
// 如果有,直接返回
if(seckillOrder!=null)
return ;
// 然后进行秒杀操作并创建订单信息
orderService.seckill(user,goodsVo);
}
}
消费者监听到testTopic中有数据时,就创建订单信息
版权声明:本文为weixin_51656756原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。