文章目录  1、kafka确保消息不丢失? 1.1、生产者端确保消息不丢失 1.2、kafka服务端确保消息不丢失 1.3、消费者确保正确无误的消费 2、生产者发送消息 KafkaService 3、UserInfoServiceImpl -> login() 4、service-account - > AccountListener.java   
 
发送模式:发后即忘、同步阻塞确认、异步非阻塞确认 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1) 配置重试:props.put(“retries”, 3)、retries: 3 kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失 消息的log日志文件损坏:搭建kafka集群(副本) 偏移量提交 偏移量重置: 消费者重试:重试主题和死信主题, @RetryableTopic() package  com. atguigu. tingshu. common. service ; import  org. slf4j.  Logger ; 
import  org. slf4j.  LoggerFactory ; 
import  org. springframework. beans. factory. annotation.  Autowired ; 
import  org. springframework. kafka. core.  KafkaTemplate ; 
import  org. springframework. kafka. support.  SendResult ; 
import  org. springframework. stereotype.  Service ; import  java. util. concurrent.  CompletableFuture ; @Service 
public  class  KafkaService  { private  static  final  Logger  logger =  LoggerFactory . getLogger ( KafkaService . class ) ; @Autowired private  KafkaTemplate  kafkaTemplate; public  void  sendMsg ( String  topic,  String  msg) { this . sendMsg ( topic,  null ,  null ,  msg) ; } public  void  sendMsg ( String  topic,  Integer  partition,  String  key,  String  msg) { CompletableFuture < SendResult > =  this . kafkaTemplate. send ( topic,  partition,  key,  msg) ; future. whenCompleteAsync ( ( result,  ex)  ->  { if  ( ex !=  null ) { logger. error ( "生产者发送消息失败!原因:{}" ,  ex. getMessage ( ) ) ; } } ) ; } } whenCompleteAsync:异步完成时的处理、当异步操作完成时此时 service-user 是生产者 发送消息
 
@Slf4j 
@Service 
@SuppressWarnings ( { "unchecked" ,  "rawtypes" } ) 
public  class  UserInfoServiceImpl  extends  ServiceImpl < UserInfoMapper ,  UserInfo > implements  UserInfoService  { @Autowired private  WxMaService  wxMaService; @Autowired private  RedisTemplate  redisTemplate; @Autowired private  UserAccountFeignClient  userAccountFeignClient; @Autowired private  KafkaService  kafkaService; @Override public  Map < String ,  Object > login ( String  code)  { HashMap < String ,  Object > =  new  HashMap < > ( ) ; try  { WxMaJscode2SessionResult  sessionInfo =  this . wxMaService. getUserService ( ) . getSessionInfo ( code) ; String  openid =  sessionInfo. getOpenid ( ) ; UserInfo  userInfo =  this . getOne ( new  LambdaQueryWrapper < UserInfo > ( ) . eq ( UserInfo :: getWxOpenId ,  openid) ) ; if  ( userInfo ==  null )  { userInfo =  new  UserInfo ( ) ; userInfo. setWxOpenId ( openid) ; userInfo. setNickname ( "这家伙太懒" +  IdWorker . getIdStr ( ) ) ; userInfo. setAvatarUrl ( "https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500" ) ; this . save ( userInfo) ; this . kafkaService. sendMsg ( KafkaConstant . QUEUE_USER_REGISTER , userInfo. getId ( ) . toString ( ) ) ; } String  token =  UUID . randomUUID ( ) . toString ( ) ; UserInfoVo  userInfoVo =  new  UserInfoVo ( ) ; BeanUtils . copyProperties ( userInfo,  userInfoVo) ; this . redisTemplate. opsForValue ( ) . set ( RedisConstant . USER_LOGIN_KEY_PREFIX  +  token,  userInfoVo, RedisConstant . USER_LOGIN_KEY_TIMEOUT ,  TimeUnit . SECONDS ) ; map. put ( "token" ,  token) ; return  map; }  catch  ( WxErrorException  e)  { throw  new  GuiguException ( ResultCodeEnum . LOGIN_AUTH ) ; } } } 
 
此时 service-account 是消费者 接收消息
 
@Slf4j 
@Component 
public  class  AccountListener  { @Autowired private  UserAccountService  userAccountService; @RetryableTopic ( backoff =  @Backoff ( 2000 ) ) @KafkaListener ( topics =  KafkaConstant . QUEUE_USER_REGISTER ) public  void  listen ( String  userId,  Acknowledgment  ack) { if  ( StringUtils . isBlank ( userId) )  { ack. acknowledge ( ) ; return ; } this . userAccountService. saveAccount ( Long . valueOf ( userId) ) ; ack. acknowledge ( ) ; } 
}