| package com.dzj.kafka_streaming.listener; import com.dzj.kafka_streaming.dto.TagNameTypeInfo; import com.dzj.kafka_streaming.service.ContentTagRelationService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Base64; import java.util.List; /** * "immersive_streaming_" + userId; 这是旧的key,需要清除 */ @Component public class MessageListener { @Autowired private ContentTagRelationService relationService; @Resource private RedisTemplate<String, Object> redisTemplate; private final String TOPIC_NAME = "event-trace-log"; // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup") @KafkaListener(topics = {TOPIC_NAME}) public void listener(ConsumerRecord<String,String> record) { //获取消息 String message = record.value(); //消息偏移量 long offset = record.offset(); String redisKeyPrefix = "kafka:user_short_video_streaming:_"; JSONObject dataJson = parseJson(message); String eventCode = dataJson.getString("eventCode"); if ("145001".equals(eventCode)){ // 测试环境------------------------------------------------------------------------------------------ // 目前只关注沉浸式中得数据 String resourceId = dataJson.getJSONObject("eventBody").getString("resourceId"); String resourceType = dataJson.getJSONObject("eventBody").getString("resourceType"); Integer duration = dataJson.getJSONObject("eventBody").getInteger("duration"); String actionCode = dataJson.getJSONObject("eventBody").getString("actionCode"); String userId = dataJson.getJSONObject("eventBody").getString("userId"); String appType = dataJson.getJSONObject("eventBody").getString("appType"); // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody")); /** * 写入Redis * redis存储结构: key = List(5),是一个定长为5,右进左出的队列 * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个 */ String key = redisKeyPrefix + userId; // String key = "immersive_streaming_wyp0001"; // 定义Redis队列写入的结构 JSONObject redisListItem = new JSONObject(); redisListItem.put("resourceId",resourceId); redisListItem.put("resourceType",resourceType); redisListItem.put("duration",duration); redisListItem.put("actionCode",actionCode); redisListItem.put("appType",appType); String redisListItemString = redisListItem.toJSONString(); if (redisTemplate.opsForList().size(key) >= 100){ Object leftPop = redisTemplate.opsForList().leftPop(key); redisTemplate.opsForList().rightPush(key, redisListItemString); System.out.println("[pop]redis key : "+ redisKeyPrefix + userId + " now contains: "+ redisTemplate.opsForList().range(key,0, -1)); }else { if (!resourceId.isEmpty() && !resourceType.isEmpty()){ redisTemplate.opsForList().rightPush(key, redisListItemString); Long size = redisTemplate.opsForList().size(key); System.out.println("redis key : "+ redisKeyPrefix + userId + " pushed one: "+ size + redisListItemString); System.out.println("redis key : "+ redisKeyPrefix + userId + " now contains: "+ redisTemplate.opsForList().range(key,0, -1)); } } } } /** * 解析json,解码功能 */ public JSONObject parseJson(String message) { JSONObject messageJson = JSONObject.parseObject(message); String dataString = messageJson.getString("data"); // --------------------base64解码字符串-------------------- String data_string = ""; final Base64.Decoder decoder = Base64.getDecoder(); try{ data_string = new String(decoder.decode(dataString), "UTF-8"); }catch (Exception e){ System.out.println("【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson" + e); } // string转换为json,只取eventCode = '145001'沉浸式的 JSONObject dataJson = JSONObject.parseObject(data_string); return dataJson; } /** * 从数据库查询 * @param resourceId * @param resourceType * @return */ public List<TagNameTypeInfo> queryByIdAndType(String resourceId, String resourceType ){ List<TagNameTypeInfo> tagNameTypeInfos = new ArrayList<>(); try { tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType); } catch (Exception e){ System.out.println("【ERROR】" + resourceId + "&" + resourceType + "在数据库中查询不到......."); } return tagNameTypeInfos; } } |