SpringCloudStreamkafka接收jsonarray字符串失败

文章目录

    • 场景现象
    • 问题处理

场景现象

  • kafka作为消息队列,作为前端设备数据到后端消费的渠道,也被多个不同微服务消费
  • 一个服务与前端边缘计算设备建立socket消息,接收实时交通事件推送,再将事件发送到kafka里面。此处使用的是Spring Kafka,普通的将事件列表数据转化为字符串后发送
  • 事件信息,需要入库和实时推送,也需要被第三方服务调用,第三方也采取消费kafka的方式
  • 这个服务,作为kafka的消费者,读取事件数据,发送短信给指定用户,但此处使用的是Spring.cloud.stream,函数式编程,绑定kafka的topic
  • 此服务消费时,加了一些打印。但是事件推送到kafka后,看不到任何日志打印,也没有报错。但是使用kafka命令查看,却已经被该消费者消费掉了
  • 消费的源代码如下:
    @Beanpublic Consumer<String> boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {log.info("Consumer Received (boxTopic): " + hwTrafficEventManager);return value -> {log.info("Consumer Received (boxTopic): " + value);// todo : do somethings};}

问题处理

  • 消费某个多分区topic时,发现只有其中一个分区数据有打印,其他都没有,但是查看却发现都消费掉了
2023-09-21 13:45:13.706  INFO 2184 --- [container-0-C-1] c.newatc.data.kafka.KafkaConfiguration   : Consumer Received (boxTopic): {"equipmentNo":0,"laneNum":4,"statisticsCycle":60,"time":1695275100110,"vehicleLanes":[{"averageParkingNum":0.5,"averageSpeed":6.1,"lane":1,"laneFlow":2,"maxQueueLength":12,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":2,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":15.3,"vehicleAverageTravelTime":0.2},{"averageParkingNum":0.0,"averageSpeed":35.7,"lane":2,"laneFlow":1,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":1,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{"averageParkingNum":0.0,"averageSpeed":0.0,"lane":3,"laneFlow":3,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":0,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{"averageParkingNum":0.0,"averageSpeed":25.4,"lane":4,"laneFlow":1,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":5,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":1.6}]}
  • 最终测试发现,列表转化为jsonarray字符串,无法打印。但是单条数据json字符串却可以
  • 将代码改为String[]后发现报错,代码如下
    @Beanpublic Consumer<String[]> boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {log.info("Consumer Received (boxTopic): " + hwTrafficEventManager);return value -> {log.info("Consumer Received (boxTopic): " + value.length);};}
  • 报错如下:
2023-09-21 11:47:59.684 ERROR 9112 --- [container-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff none exhausted for boxTopic-3@29941org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]), failedMessage=GenericMessage [payload=byte[96], headers={kafka_offset=29941, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3c31888e, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=3, kafka_receivedMessageKey=[B@3fcf7bd1, kafka_receivedTopic=boxTopic, kafka_receivedTimestamp=1695267871945, contentType=application/json, kafka_groupId=data-center}]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2683)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2649)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2609)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2536)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2427)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2305)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1364)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1355)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247)at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237)at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:113)at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176)at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:48)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1282)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1057)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:696)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:551)at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:754)at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:586)at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2629)... 12 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo":1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741)at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515)at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1420)at com.fasterxml.jackson.databind.DeserializationContext.extractScalarFromObject(DeserializationContext.java:932)at com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseString(StdDeserializer.java:1292)at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:166)at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:25)at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3723)at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:223)... 45 common frames omitted
  • 通过日志可以看出,是在接收数据的解析时发生了问题,jsonarray的字符串无法接受解析
  • 在消息发生时,在正常消息后面,加一个“-”,可以正常接收了,就是解析时需要先删掉尾字符再转json
  • 后面又发现,如果是作为字符接收,再转为字符串,是可以的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/94509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

window安装压缩版postgresql

环境&#xff1a; window 11 专业版postgresql-16.0-1-windows-x64-binaries.zip 一、下载 1.1 从官网下载 https://www.postgresql.org/download/windows/ 1.2 从百度网盘下载 链接&#xff1a;https://pan.baidu.com/s/1fmQbgWSzX4hN07Lgdzfz0g?pwddzyy 提取码&#…

C++ YAML使用

C++工程如何使用YAML-cpp 一、前期准备工作 1、已安装minGW、cmake、make等本地工具。 2、下载YAML-cpp第三方开源代码(一定要下载最新的release版本,不然坑很多)。 3、生成YAML-cpp静态库 (1)在yaml-cpp-master下建立build文件夹; (2)在该文件夹下生成MakaFile文…

C/C++字符函数和字符串函数详解————内存函数详解与模拟

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力&#xff0c;一起奔赴大厂。 目录 1.前言 2 .memcpy函数 3.memmove函…

以太网基础学习(一)——以太网概述

一、以太网概述 以太网(Ethernet)指的是由 Xerox公司创建并由Xerox、Intel和 DEC公司联合开发的基带局域网规范&#xff0c;通用的以太网标准于1980年9月30日出台&#xff0c;是当今现有局域网采用的最通用的通信协议标准&#xff08;是局域网的一种&#xff09;。 以太网是一种…

数据结构与算法之堆: Leetcode 347. 前K个高频元素 (Typescript版)

前K个高频元素 https://leetcode.cn/problems/top-k-frequent-elements/ 描述 给你一个整数数组 nums 和一个整数 k &#xff0c;请你返回其中出现频率前 k 高的元素。你可以按 任意顺序 返回答案。 示例 1 输入: nums [1,1,1,2,2,3], k 2 输出: [1,2]示例 2 输入: num…

【oncmdmsg 鼠标】2023/8/19 上午9:50:14

2023/8/19 上午9:50:14 oncmdmsg 鼠标 2023/8/19 上午9:50:22 抱歉,您提到的 “oncmdmsg” 和 “鼠标” 是什么?请提供更多上下文信息,我将尽力帮助您。 2023/8/19 上午9:51:43 OnCmdMsg 2023/8/19 上午9:52:21 “OnCmdMsg” 是一个在 MFC (Microsoft Foundation Cla…

(C++版)ROS2 bind函数解读

在ros2的发布者节点里面有这么一句话&#xff1a;估计没有学过C的人不太理解&#xff0c;这里我就发发好心帮忙解读一下timer_ this->create_wall_timer(500ms, std::bind(&MinimalPublisher::timer_callback, this)); timer_ this->create_wall_timer(500ms, std…

url请求头信息

Accept Accept&#xff1a;请求报头域&#xff0c;用于指定客户端可接受哪些类型的信息。 Accept-Language Accept-Language&#xff1a;指定客户端可接受的语言类型。 Accept-Encoding Accept-Encoding&#xff1a;指定客户端可接受的内容编码。 Host Host&#xff1a;…

自动驾驶中的感知模型:实现安全与智能驾驶的关键

自动驾驶中的感知模型&#xff1a;实现安全与智能驾驶的关键 文章目录 引言感知模型的作用感知模型的技术安全与挑战结论 2023星火培训【专项营】Apollo开发者社区布道师倾力打造&#xff0c;包含PnC、新感知等的全新专项课程上线了。理论与实践相结合&#xff0c;全新的PnC培训…

unplugin-vue-components和unplugin-auto-import插件

unplugin-auto-import&#xff1a;自动按需引入 vue\vue-router\pinia 等的 api unplugin-vue-components&#xff1a;自动按需引入 第三方的组件库组件 和 我们自定义的组件 使用此类插件&#xff0c;不需要手动编写import {xxx} from vue这样的代码了&#xff0c;提升开发效…

Linux驱动设备号分配与自动创建设备节点

Linux 驱动设备号 对于 Linux 系统&#xff0c;为了识别和管理设备&#xff0c;每个设备便使用一个唯一的编号来标记设备&#xff0c;每个注册到内核的设备都需要一个编号&#xff0c;这个编号就是设备号&#xff0c;为了细分设备号分为主设备号和次设备号。 由于 Linux 的设…

【Java-LangChain:使用 ChatGPT API 搭建系统-6】处理输入-链式 Prompt Chaining Prompts

第六章&#xff0c;处理输入-链式 Prompt Chaining Prompts 在本章中&#xff0c;我们将学习如何通过将复杂任务拆分为一系列简单的子任务来链接多个 Prompt。 您可能会想&#xff0c;为什么要将任务拆分为多个 Prompt&#xff0c;而不是像我们在上一个视频中学习的那样&…

嵌入式数据库sqlite3基本命令操作基础(05)

前言 数据在实际工作中应用非常广泛&#xff0c;数据库的产品也比较多&#xff0c;oracle、DB2、SQL2000、mySQL&#xff1b;基于嵌入式linux的数据库主要有SQLite, Firebird, Berkeley DB, eXtremeDB。 本文主要讲解数据库SQLite&#xff0c;通过这个开源的小型的嵌入式数据…

基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度(matlab代码)

目录 1 主要内容 系统结构图 P2G-CCS 耦合模型 其他算例对比 2 部分代码 3 下载链接 1 主要内容 该程序复现《基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度》模型&#xff0c;以碳交易和碳封存成本、燃煤机组启停和煤耗成本、弃风成本、购气成本之和为目标…

CANoe.Diva生成测试用例

Diva目录 一、CANoe.Diva打开CDD文件二、导入CDD文件三、ECU Information四、时间参数设置五、选择是否测试功能寻址六、勾选需要测试服务项七、生成测试用例 一、CANoe.Diva打开CDD文件 CANoe.Diva可以通过导入cdd或odx文件&#xff0c;自动生成全面的测试用例。再在CANoe中导…

简单查找重复文本文件

声明这是最初 我的提问给个文本分类清单input查找文件夹下 .py .txt .excel .word 一模一样的文本不是找文件名 找相同格式下的文件文本是否一样 文件单独复制到文件夹下两个文件全部复制到文件夹下 print 打印相同文本文件的名字 比如查找到了3.py与4.5.是.py文件中的文本文件…

二分查找模版

对于一个递增序列我们要找大于等于target的数&#xff0c;返回结果的下标时 比如 序列 5 7 7 8 8 10 初始化左右指针l0 rn-1 猜测区间 [l,r] 闭区间&#xff0c;mid(lr)/2 防溢出就写成 midl(r-l)/2 如果有nums[mid]<target 那么[l,mid]这个区间的数就都小于target 更新 lmi…

5.Vectors Transformation Rules

在上节&#xff0c;有个问题&#xff1a;向量分量的转换方式 与 新旧基底的转换方式相反 用例子来感受一下&#xff0c; 空间中一向量V&#xff0c;即该空间的一个基底&#xff1a;e1、e2 v e1 e2 现把基底 e1 、 e2 放大两倍。变成 基向量放大了两倍&#xff0c; 但对于…

Javascript 事件的动态绑定

动态绑定事件&#xff0c;是指在代码执行过程中&#xff0c;通过Javascript代码来绑定事件。这种技术可以大大增强网页的交互性和用户体验。上一期介绍的是通过事件监听器 EventListener 去实现元素颜色的变化。这一期将通过动态绑定方法去实现&#xff0c;对象.事件 匿名函数…

【广州华锐互动】鱼类授精繁殖VR虚拟仿真实训系统

随着科技的不断发展&#xff0c;虚拟现实技术在各个领域的应用越来越广泛。在养殖业中&#xff0c;VR技术可以帮助养殖户进行家鱼授精实操演练&#xff0c;提高养殖效率和繁殖成功率。本文将介绍利用VR开展家鱼授精实操演练的方法和应用。 首先&#xff0c;我们需要了解家鱼授精…