| packagecom.dzj.kafka_streaming.listener;
 importcom.dzj.kafka_streaming.dto.TagNameTypeInfo;
 importcom.dzj.kafka_streaming.service.ContentTagRelationService;
 importorg.apache.kafka.clients.consumer.ConsumerRecord;
 importorg.springframework.beans.factory.annotation.Autowired;
 importorg.springframework.data.redis.core.RedisTemplate;
 importorg.springframework.kafka.annotation.KafkaListener;
 importorg.springframework.stereotype.Component;
 importcom.alibaba.fastjson.JSONObject;
 importjavax.annotation.Resource;
 importjava.util.ArrayList;
 importjava.util.Base64;
 importjava.util.List;
 /**
  * "immersive_streaming_" + userId; 这是旧的key,需要清除
  */
 @Component
 publicclassMessageListener {
     @Autowired
     privateContentTagRelationService relationService;
     @Resource
     privateRedisTemplate<String, Object> redisTemplate;
     privatefinalString TOPIC_NAME = "event-trace-log";
     // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup")
     @KafkaListener(topics = {TOPIC_NAME})
     publicvoidlistener(ConsumerRecord<String,String> record)  {
         //获取消息
         String message = record.value();
         //消息偏移量
         longoffset = record.offset();
         String redisKeyPrefix = "kafka:user_short_video_streaming:_";
         JSONObject dataJson = parseJson(message);
         String eventCode = dataJson.getString("eventCode");
         if("145001".equals(eventCode)){
             // 测试环境------------------------------------------------------------------------------------------
             // 目前只关注沉浸式中得数据
             String resourceId = dataJson.getJSONObject("eventBody").getString("resourceId");
             String resourceType = dataJson.getJSONObject("eventBody").getString("resourceType");
             Integer duration = dataJson.getJSONObject("eventBody").getInteger("duration");
             String actionCode = dataJson.getJSONObject("eventBody").getString("actionCode");
             String userId = dataJson.getJSONObject("eventBody").getString("userId");
             String appType = dataJson.getJSONObject("eventBody").getString("appType");
             // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody"));
             /**
              * 写入Redis
              * redis存储结构: key = List(5),是一个定长为5,右进左出的队列
              * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个
              */
             String key = redisKeyPrefix + userId;
     //        String key = "immersive_streaming_wyp0001";
             // 定义Redis队列写入的结构
             JSONObject redisListItem = newJSONObject();
             redisListItem.put("resourceId",resourceId);
             redisListItem.put("resourceType",resourceType);
             redisListItem.put("duration",duration);
             redisListItem.put("actionCode",actionCode);
             redisListItem.put("appType",appType);
             String redisListItemString = redisListItem.toJSONString();
             if(redisTemplate.opsForList().size(key) >= 100){
                 Object leftPop = redisTemplate.opsForList().leftPop(key);
                 redisTemplate.opsForList().rightPush(key, redisListItemString);
                 System.out.println("[pop]redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));
             }else{
                 if(!resourceId.isEmpty() && !resourceType.isEmpty()){
                     redisTemplate.opsForList().rightPush(key, redisListItemString);
                     Long size = redisTemplate.opsForList().size(key);
                     System.out.println("redis key : "+ redisKeyPrefix + userId + " pushed one:  "+ size + redisListItemString);
                     System.out.println("redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));
                 }
             }
         }
     }
     
     /**
      * 解析json,解码功能
      */
     publicJSONObject parseJson(String message) {
         JSONObject messageJson = JSONObject.parseObject(message);
         String dataString = messageJson.getString("data");
         // --------------------base64解码字符串--------------------
         String data_string = "";
         finalBase64.Decoder decoder = Base64.getDecoder();
         try{
             data_string = newString(decoder.decode(dataString), "UTF-8");
         }catch(Exception e){
             System.out.println("【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson"+ e);
         }
         // string转换为json,只取eventCode = '145001'沉浸式的
         JSONObject dataJson = JSONObject.parseObject(data_string);
         returndataJson;
     }
     /**
      * 从数据库查询
      * @param resourceId
      * @param resourceType
      * @return
      */
     publicList<TagNameTypeInfo>  queryByIdAndType(String resourceId, String resourceType ){
         List<TagNameTypeInfo> tagNameTypeInfos = newArrayList<>();
         try{
             tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType);
         } catch(Exception e){
             System.out.println("【ERROR】"+ resourceId + "&"+ resourceType + "在数据库中查询不到.......");
         }
         returntagNameTypeInfos;
     }
 }
 |