档案网站建设书管理网站怎么做的
档案网站建设书,管理网站怎么做的,网站制作后台怎么做,服装网站建设的目的文章目录 场景现象问题处理 场景现象
kafka作为消息队列#xff0c;作为前端设备数据到后端消费的渠道#xff0c;也被多个不同微服务消费一个服务与前端边缘计算设备建立socket消息#xff0c;接收实时交通事件推送#xff0c;再将事件发送到kafka里面。此处使用的是Spri… 文章目录 场景现象问题处理 场景现象
kafka作为消息队列作为前端设备数据到后端消费的渠道也被多个不同微服务消费一个服务与前端边缘计算设备建立socket消息接收实时交通事件推送再将事件发送到kafka里面。此处使用的是Spring Kafka普通的将事件列表数据转化为字符串后发送事件信息需要入库和实时推送也需要被第三方服务调用第三方也采取消费kafka的方式这个服务作为kafka的消费者读取事件数据发送短信给指定用户但此处使用的是Spring.cloud.stream函数式编程绑定kafka的topic此服务消费时加了一些打印。但是事件推送到kafka后看不到任何日志打印也没有报错。但是使用kafka命令查看却已经被该消费者消费掉了消费的源代码如下 Beanpublic ConsumerString boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {log.info(Consumer Received (boxTopic): hwTrafficEventManager);return value - {log.info(Consumer Received (boxTopic): value);// todo : do somethings};}问题处理
消费某个多分区topic时发现只有其中一个分区数据有打印其他都没有但是查看却发现都消费掉了
2023-09-21 13:45:13.706 INFO 2184 --- [container-0-C-1] c.newatc.data.kafka.KafkaConfiguration : Consumer Received (boxTopic): {equipmentNo:0,laneNum:4,statisticsCycle:60,time:1695275100110,vehicleLanes:[{averageParkingNum:0.5,averageSpeed:6.1,lane:1,laneFlow:2,maxQueueLength:12,nonVehicleAverageTravelTime:0.0,overStopLineFlow:2,pedestrianAverageTravelTime:0,vehicleAverageDelay:15.3,vehicleAverageTravelTime:0.2},{averageParkingNum:0.0,averageSpeed:35.7,lane:2,laneFlow:1,maxQueueLength:0,nonVehicleAverageTravelTime:0.0,overStopLineFlow:1,pedestrianAverageTravelTime:0,vehicleAverageDelay:0.0,vehicleAverageTravelTime:0.0},{averageParkingNum:0.0,averageSpeed:0.0,lane:3,laneFlow:3,maxQueueLength:0,nonVehicleAverageTravelTime:0.0,overStopLineFlow:0,pedestrianAverageTravelTime:0,vehicleAverageDelay:0.0,vehicleAverageTravelTime:0.0},{averageParkingNum:0.0,averageSpeed:25.4,lane:4,laneFlow:1,maxQueueLength:0,nonVehicleAverageTravelTime:0.0,overStopLineFlow:5,pedestrianAverageTravelTime:0,vehicleAverageDelay:0.0,vehicleAverageTravelTime:1.6}]}最终测试发现列表转化为jsonarray字符串无法打印。但是单条数据json字符串却可以将代码改为String[]后发现报错代码如下 Beanpublic ConsumerString[] boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {log.info(Consumer Received (boxTopic): hwTrafficEventManager);return value - {log.info(Consumer Received (boxTopic): value.length);};}报错如下
2023-09-21 11:47:59.684 ERROR 9112 --- [container-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff none exhausted for boxTopic-329941org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type [Ljava.lang.String; from Object value (token JsonToken.START_OBJECT)at [Source: (byte[])[{equipmentNo:0,eventCode:8,eventType:1,laneTurn:0,relationNo:1,time:1695267981221}]; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type [Ljava.lang.String; from Object value (token JsonToken.START_OBJECT)at [Source: (byte[])[{equipmentNo:0,eventCode:8,eventType:1,laneTurn:0,relationNo:1,time:1695267981221}]; line: 1, column: 2] (through reference chain: java.lang.Object[][0]), failedMessageGenericMessage [payloadbyte[96], headers{kafka_offset29941, kafka_consumerorg.apache.kafka.clients.consumer.KafkaConsumer3c31888e, deliveryAttempt3, kafka_timestampTypeCREATE_TIME, kafka_receivedPartitionId3, kafka_receivedMessageKey[B3fcf7bd1, kafka_receivedTopicboxTopic, kafka_receivedTimestamp1695267871945, contentTypeapplication/json, kafka_groupIddata-center}]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2683)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2649)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2609)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2536)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2427)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2305)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1364)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1355)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247)at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type [Ljava.lang.String; from Object value (token JsonToken.START_OBJECT)at [Source: (byte[])[{equipmentNo:0,eventCode:8,eventType:1,laneTurn:0,relationNo:1,time:1695267981221}]; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type [Ljava.lang.String; from Object value (token JsonToken.START_OBJECT)at [Source: (byte[])[{equipmentNo:0,eventCode:8,eventType:1,laneTurn:0,relationNo:1,time:1695267981221}]; line: 1, column: 2] (through reference chain: java.lang.Object[][0])at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237)at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:113)at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176)at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:48)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1282)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1057)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:696)at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:551)at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:754)at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:586)at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2629)... 12 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type [Ljava.lang.String; from Object value (token JsonToken.START_OBJECT)at [Source: (byte[])[{equipmentNo:0,eventCode:8,eventType:1,laneTurn:0,relationNo:1,time:1695267981221}]; line: 1, column: 2] (through reference chain: java.lang.Object[][0])at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741)at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515)at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1420)at com.fasterxml.jackson.databind.DeserializationContext.extractScalarFromObject(DeserializationContext.java:932)at com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseString(StdDeserializer.java:1292)at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:166)at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:25)at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3723)at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:223)... 45 common frames omitted通过日志可以看出是在接收数据的解析时发生了问题jsonarray的字符串无法接受解析在消息发生时在正常消息后面加一个“-”可以正常接收了就是解析时需要先删掉尾字符再转json后面又发现如果是作为字符接收再转为字符串是可以的
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/88685.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!