Flink源码阅读:Kafka Connector

news/2026/1/20 22:14:51/文章来源:https://www.cnblogs.com/Jackeyzhe/p/19508927

本文我们来梳理 Kafka Connector 相关的源码。

自定义 Source 和 Sink

在介绍 Kafka Connector 之前,我们先来看一下在 Flink 中是如何支持自定义 Source 和 Sink 的。我们来看一张 Flink 官方文档提供的图。

tableconnector

这张图展示了 Connector 的基本体系结构,三层架构也非常清晰。

Metadata

首先是最上层的 MetaData,CREATE TABLE 会更新 Catalog,然后被转换为 TableAPI 的 CatalogTable,CatalogTable 实例用于表示动态表(Source 或 Sink 表)的元信息。

Planning

在解析和优化程序时,会将 CatalogTable 转换为 DynamicTableSource 和 DynamicTableSink,分别用于查询和插入数据,这两个实例的创建都需要对应的工厂类,工厂类的完整路径需要放到这个配置文件中。

META-INF/services/org.apache.flink.table.factories.Factory

如果有需要的话,我们还可以在解析过程中配置编码和解码方法。

在 Source 端,通过三个接口支持不同的查询能力。

  • ScanTableSource:用于消费 changelog 流,扫描的数据支持 insert、updata、delete 三种类型。ScanTableSource 还支持很多其他的功能, 都是通过接口提供的。具体可以看参考这个连接

    https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/sourcessinks/#source-abilities
    
  • LookupTableSource:LookupTableSource 不会全量读取表的数据,它在需要时会发送请求,懒加载数据。目前只支持 insert-only 变更模式。

  • VectorSearchTableSource:使用一个输入向量来搜索数据,并返回最相似的 Top-K 行数据。

在 Sink 端,通过 DynamicTableSink 来实现具体的写入逻辑,这里也提供了一些用于扩展能力的接口。具体参考

https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/sourcessinks/#sink-abilities

Runtime

逻辑解析完成后,会到 Runtime 层。这里就是定义几个 Provider,在 Provider 中实现和连接器具体的交互逻辑。

小结

当我们需要创建一个自定义的 Source 和 Sink 时,就可以通过以下步骤实现。

  1. 定义 Flink SQL 的 DDL,需要定义相应的 Options。

  2. 实现 DynamicTableSourceFactory 和 DynamicTableSinkFactory,并把实现类的具体路径写到配置文件中。

  3. 实现 DynamicTableSource 和 DynamicTableSink,这里需要处理 SQL 层的元数据。

  4. 提供 Provider,将逻辑层与底层 DataStream 关联起来。

  5. 编写底层算子,实现 Source 和 Sink 接口。

Kafka Connector 的实现

带着这些知识,我们一起来看一下 Kafka Connector 相关的源码。

Kafka Connector 代码目前已经是一个独立的项目了。项目地址是

https://github.com/apache/flink-connector-kafka

Factory

我们首先找到定义的工厂类

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory

以 KafkaDynamicTableFactory 为例,它同时实现了 DynamicTableSourceFactory 和 DynamicTableSinkFactory 两个接口。

KafkaDynamicTableFactory 包含以下几个方法。

KafkaDynamicTableFactory

  • factoryIdentifier:返回一个唯一标识符,对应 Flink SQL 中 connector='xxx' 这个配置。

  • requiredOptions:必填配置集合。

  • optionalOptions:选填配置集合。

  • forwardOptions:直接传递到 Runtime 层的配置集合。

  • createDynamicTableSource:创建 DynamicTableSource。

  • createDynamicTableSink:创建 DynamicTableSink。

Source 端

工厂类的 createDynamicTableSource 方法创建了 DynamicTableSource,我们来看一下创建的逻辑。

public DynamicTableSource createDynamicTableSource(Context context) {final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =getKeyDecodingFormat(helper);final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =getValueDecodingFormat(helper);helper.validateExcept(PROPERTIES_PREFIX);final ReadableConfig tableOptions = helper.getOptions();validateTableSourceOptions(tableOptions);validatePKConstraints(context.getObjectIdentifier(),context.getPrimaryKeyIndexes(),context.getCatalogTable().getOptions(),valueDecodingFormat);final StartupOptions startupOptions = getStartupOptions(tableOptions);final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());// add topic-partition discoveryfinal Duration partitionDiscoveryInterval =tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),Long.toString(partitionDiscoveryInterval.toMillis()));final DataType physicalDataType = context.getPhysicalRowDataType();final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);final Integer parallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);return createKafkaTableSource(physicalDataType,keyDecodingFormat.orElse(null),valueDecodingFormat,keyProjection,valueProjection,keyPrefix,getTopics(tableOptions),getTopicPattern(tableOptions),properties,startupOptions.startupMode,startupOptions.specificOffsets,startupOptions.startupTimestampMillis,boundedOptions.boundedMode,boundedOptions.specificOffsets,boundedOptions.boundedTimestampMillis,context.getObjectIdentifier().asSummaryString(),parallelism);
}

在这个方法中,首先要获取到 key 和 value 的解码格式。接着是各种参数校验和获取必要的属性。最后创建 KafkaDynamicSource 实例。

获取解码格式需要用到 DeserializationFormatFactory 工厂,DeserializationFormatFactory 有多个实现类,对应了多种格式的反序列化方法。

DeserializationFormatFactory

我们来看比较常见的 Json 格式的工厂 JsonFormatFactory。

public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {FactoryUtil.validateFactoryOptions(this, formatOptions);JsonFormatOptionsUtil.validateDecodingFormatOptions(formatOptions);final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);final boolean jsonParserEnabled = formatOptions.get(DECODE_JSON_PARSER_ENABLED);TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() {@Overridepublic DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context,DataType physicalDataType,int[][] projections) {final DataType producedDataType =Projection.of(projections).project(physicalDataType);final RowType rowType = (RowType) producedDataType.getLogicalType();final TypeInformation<RowData> rowDataTypeInfo =context.createTypeInformation(producedDataType);if (jsonParserEnabled) {return new JsonParserRowDataDeserializationSchema(rowType,rowDataTypeInfo,failOnMissingField,ignoreParseErrors,timestampOption,toProjectedNames((RowType) physicalDataType.getLogicalType(), projections));} else {return new JsonRowDataDeserializationSchema(rowType,rowDataTypeInfo,failOnMissingField,ignoreParseErrors,timestampOption);}}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();}@Overridepublic boolean supportsNestedProjection() {return jsonParserEnabled;}};
}

在创建解码格式时,最重要的是创建运行时的解码器,也就是 DeserializationSchema,在 JsonFormatFactory 中,有 JsonParserRowDataDeserializationSchema 和 JsonRowDataDeserializationSchema 两种实现,分别是用于将 JsonParser 和 JsonNode 转换成为 RowData,具体的逻辑都在 createNotNullConverter 方法中。

了解完解码格式后,我们把视角拉回到 KafkaDynamicSource,它实现了三个接口 ScanTableSource、SupportsReadingMetadata、SupportsWatermarkPushDown。分别用于消费数据,读取元数据和生成水印。

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {final DeserializationSchema<RowData> keyDeserialization =createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);final DeserializationSchema<RowData> valueDeserialization =createDeserialization(context, valueDecodingFormat, valueProjection, null);final TypeInformation<RowData> producedTypeInfo =context.createTypeInformation(producedDataType);final KafkaSource<RowData> kafkaSource =createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo);return new DataStreamScanProvider() {@Overridepublic DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {if (watermarkStrategy == null) {watermarkStrategy = WatermarkStrategy.noWatermarks();}DataStreamSource<RowData> sourceStream =execEnv.fromSource(kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier);providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid);return sourceStream;}@Overridepublic boolean isBounded() {return kafkaSource.getBoundedness() == Boundedness.BOUNDED;}@Overridepublic Optional<Integer> getParallelism() {return Optional.ofNullable(parallelism);}};
}

在 ScanRuntimeProvider 的逻辑中,先获取到反序列化器,也就是刚刚我们提到的 DeserializationSchema。

KafkaSource

然后开始创建 KafkaSource 实例,它是 Source 的实现类,也就是执行引擎层了,这个过程会依次创建图中这些类。

KafkaSource 中主要是创建 KafkaSourceReader 和 KafkaSourceEnumerator,KafkaSourceEnumerator 是负责和分片相关的逻辑,包括分片分配和分片发现等。

KafkaSourceReader 中主要是和 State 相关的逻辑,包括触发快照和完成 Checkpoint 通知的方法。当做 Snapshot 时,会记录活跃 split 的 offset,同时将 split 作为状态提交。当 Checkpoint 完成时,会调用 KafkaSourceFetcherManager.commitOffsets 提交 offset。

public List<KafkaPartitionSplit> snapshotState(long checkpointId) {List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);if (!commitOffsetsOnCheckpoint) {return splits;}if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {offsetsToCommit.put(checkpointId, Collections.emptyMap());} else {Map<TopicPartition, OffsetAndMetadata> offsetsMap =offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());// Put the offsets of the active splits.for (KafkaPartitionSplit split : splits) {// If the checkpoint is triggered before the partition starting offsets// is retrieved, do not commit the offsets for those partitions.if (split.getStartingOffset() >= 0) {offsetsMap.put(split.getTopicPartition(),new OffsetAndMetadata(split.getStartingOffset()));}}// Put offsets of all the finished splits.offsetsMap.putAll(offsetsOfFinishedSplits);}return splits;
}public void notifyCheckpointComplete(long checkpointId) throws Exception {LOG.debug("Committing offsets for checkpoint {}", checkpointId);...((KafkaSourceFetcherManager) splitFetcherManager).commitOffsets(committedPartitions,(ignored, e) -> {...});
}

KafkaSourceFetcherManager 负责管理 fetcher 线程,提交 Offset。

KafkaPartitionSplitReader 的 fetch 方法用来消费 Kafka 的数据。

public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {ConsumerRecords<byte[], byte[]> consumerRecords;try {consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));} catch (WakeupException | IllegalStateException e) {// IllegalStateException will be thrown if the consumer is not assigned any partitions.// This happens if all assigned partitions are invalid or empty (starting offset >=// stopping offset). We just mark empty partitions as finished and return an empty// record container, and this consumer will be closed by SplitFetcherManager.KafkaPartitionSplitRecords recordsBySplits =new KafkaPartitionSplitRecords(ConsumerRecords.empty(), kafkaSourceReaderMetrics);markEmptySplitsAsFinished(recordsBySplits);return recordsBySplits;}KafkaPartitionSplitRecords recordsBySplits =new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics);List<TopicPartition> finishedPartitions = new ArrayList<>();for (TopicPartition tp : consumer.assignment()) {long stoppingOffset = getStoppingOffset(tp);long consumerPosition = getConsumerPosition(tp, "retrieving consumer position");// Stop fetching when the consumer's position reaches the stoppingOffset.// Control messages may follow the last record; therefore, using the last record's// offset as a stopping condition could result in indefinite blocking.if (consumerPosition >= stoppingOffset) {LOG.debug("Position of {}: {}, has reached stopping offset: {}",tp,consumerPosition,stoppingOffset);recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);finishSplitAtRecord(tp, stoppingOffset, consumerPosition, finishedPartitions, recordsBySplits);}}// Only track non-empty partition's record lag if it never appears beforeconsumerRecords.partitions().forEach(trackTp -> {kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp);});markEmptySplitsAsFinished(recordsBySplits);// Unassign the partitions that has finished.if (!finishedPartitions.isEmpty()) {finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);unassignPartitions(finishedPartitions);}// Update numBytesInkafkaSourceReaderMetrics.updateNumBytesInCounter();return recordsBySplits;
}

至此,Source 端相关的源码我们就梳理完了。接下来我们再看 Sink 端的代码。

Sink 端

我们从工厂类中的 createDynamicTableSink 方法开始。

public DynamicTableSink createDynamicTableSink(Context context) {final TableFactoryHelper helper =FactoryUtil.createTableFactoryHelper(this, autoCompleteSchemaRegistrySubject(context));final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =getKeyEncodingFormat(helper);final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =getValueEncodingFormat(helper);helper.validateExcept(PROPERTIES_PREFIX);final ReadableConfig tableOptions = helper.getOptions();final DeliveryGuarantee deliveryGuarantee = validateDeprecatedSemantic(tableOptions);validateTableSinkOptions(tableOptions);KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);validatePKConstraints(context.getObjectIdentifier(),context.getPrimaryKeyIndexes(),context.getCatalogTable().getOptions(),valueEncodingFormat);final DataType physicalDataType = context.getPhysicalRowDataType();final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);return createKafkaTableSink(physicalDataType,keyEncodingFormat.orElse(null),valueEncodingFormat,keyProjection,valueProjection,keyPrefix,getTopics(tableOptions),getTopicPattern(tableOptions),getKafkaProperties(context.getCatalogTable().getOptions()),getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),deliveryGuarantee,parallelism,tableOptions.get(TRANSACTIONAL_ID_PREFIX),tableOptions.get(TRANSACTION_NAMING_STRATEGY));
}

和 Source 的流程很相似,这里首先是获取 key 和 value 的编码格式,然后做了很多校验,最后是创建 KafkaDynamicSink 实例。

获取编码格式用到的工厂类是 SerializationFormatFactory,我们前面介绍的 JsonFormatFactory 也实现了 SerializationFormatFactory,因此它既提供了解码格式,又提供了编码格式。编码格式用到的编码器是 JsonRowDataSerializationSchema,通过 RowDataToJsonConverters 将 RowData 转换成 JsonNode。

在 KafkaDynamicSink 的 getSinkRuntimeProvider 方法中,主要就是创建 KafkaSink 实例。

KafkaSink

KafkaSink 类实现了 TwoPhaseCommittingStatefulSink 接口,即支持两阶段提交。它创建了 KafkaWrter 和 KafkaCommiter。

创建 KafkaWriter 时,如果配置的是 ExactlyOnce 模式,则会创建出 ExactlyOnceKafkaWriter,否则创建 KafkaWriter。Writer 真正实现两阶段提交的是 ExactlyOnceKafkaWriter。它在启动时,会调用 producer.beginTransaction 开启一个事务。数据写入时会调用 KafkaWriter.write 方法,此操作会被标记为事务内的操作。当 Sink 收到 Barrier 时,会先调用 flush 方法,将缓冲区的数据都发送到 Kafka Broker,然后调用 prepareCommit 方法预提交。预提交方法中记录 epoch 和 transactionalId 返回给框架层。

public Collection<KafkaCommittable> prepareCommit() {// only return a KafkaCommittable if the current transaction has been written some dataif (currentProducer.hasRecordsInTransaction()) {KafkaCommittable committable = KafkaCommittable.of(currentProducer);LOG.debug("Prepare {}.", committable);currentProducer.precommitTransaction();return Collections.singletonList(committable);}// otherwise, we recycle the producer (the pool will reset the transaction state)producerPool.recycle(currentProducer);return Collections.emptyList();
}

状态保存时,会将预提交的 transactionalId 存到状态中。

public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {// recycle committed producersTransactionFinished finishedTransaction;while ((finishedTransaction = backchannel.poll()) != null) {producerPool.recycleByTransactionId(finishedTransaction.getTransactionId(), finishedTransaction.isSuccess());}// persist the ongoing transactions into the state; these will not be aborted on restartCollection<CheckpointTransaction> ongoingTransactions =producerPool.getOngoingTransactions();currentProducer = startTransaction(checkpointId + 1);return createSnapshots(ongoingTransactions);
}private List<KafkaWriterState> createSnapshots(Collection<CheckpointTransaction> ongoingTransactions) {List<KafkaWriterState> states = new ArrayList<>();int[] subtaskIds = this.ownedSubtaskIds;for (int index = 0; index < subtaskIds.length; index++) {int ownedSubtask = subtaskIds[index];states.add(new KafkaWriterState(transactionalIdPrefix,ownedSubtask,totalNumberOfOwnedSubtasks,transactionNamingStrategy.getOwnership(),// new transactions are only created with the first owned subtask idindex == 0 ? ongoingTransactions : List.of()));}LOG.debug("Snapshotting state {}", states);return states;
}

当 Checkpoint 完成时,会调用 KafkaCommitter.commit 方法。在 commit 方法中会调用 producer.commitTransaction 正式提交事务。

FlinkKafkaInternalProducer 是 Flink 内部封装的与 Kafka 生产者的交互类,所有与 Kafka 生产者的交互都通过它执行。

关于 Kafka Connector 的 Sink 端的源码我们就梳理到这里。

总结

最后还是总结一下。本文我们先了解了 Flink 中自定义 Source 和 Sink 的流程。按照这个流程,我们梳理了 Kafka Connector 的源码。在 Source 端,Flink Kafka 封装了对消费者 Offset 的提交逻辑。在 Sink 端结合了 Kafka 提供的事务支持实现了两阶段提交的逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1190977.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jetson 磁盘加密学习笔记:从 LUKS/dm-crypt 到 APP/APP_ENC 与量产流程

📺 B站视频讲解(Bilibili):博主个人介绍 📘 《Yocto项目实战教程》京东购买链接:Yocto项目实战教程 Jetson 磁盘加密学习笔记:从 LUKS/dm-crypt 到 APP/APP_ENC 与量产流程 目标:基于 NVIDIA Jetson Linux R36.4.3 官方文档,把 Jetson 的 Disk Encryption(磁盘加密…

计算机大数据毕设实战-基于hadoop的山东瓜果蔬菜分析系统【完整源码+LW+部署说明+演示视频,全bao一条龙等】

java毕业设计-基于springboot的(源码LW部署文档全bao远程调试代码讲解等) 博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、…

AI写作知识体系:架构、理论与工程实践【高级版】

AI写作知识体系&#xff1a;架构、理论与工程实践 摘要 本研究旨在系统构建与阐释人工智能写作领域的知识体系&#xff0c;提出并论证了一个多层次、跨学科的“五层金字塔”理论架构。该体系自上而下整合了产业应用、创作理论、工程方法、技术模型与基础理论五大维度&#xf…

树的练习7--------LCR 052.递增顺序搜索树

前言 今天这一题也是非常的憋屈&#xff0c;在递归上的运用可谓是得心应手&#xff0c;但是在一些小细节上却存在着致命的问题&#xff0c;现在来总结一下。 题目&#xff1a;点这里 解法&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* in…

亲测有效!论文降AI率压到10%以下的技巧:这4个指令+3个技巧,降AI率真的够用了!

写完论文之后&#xff0c;你最怕的是什么&#xff1f; 我猜肯定是查重和查AI率吧。 AI率太高学校不让过&#xff0c;太低又显得不太真实…… 这意味着你得反复改、反复调&#xff0c;不停换词换句式&#xff0c;费时又费脑&#xff0c;简直让人头大。 别慌&#xff0c;这事儿…

大数据毕设选题推荐:基于python的机器学习房价预测可视化系统基于机器学习的房子价值预测系统的设计与实现【附源码、mysql、文档、调试+代码讲解+全bao等】

java毕业设计-基于springboot的(源码LW部署文档全bao远程调试代码讲解等) 博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、…

pycharm inherit packages from base interpreter 和 make available to all projects 的区别是什么

在 PyCharm 中,当你配置 Python 解释器(Interpreter)时,会遇到两个相关的选项:Inherit packages from base interpreter Make available to all projects这两个选项分别作用于虚拟环境的包继承行为和解释器本身的…

苹果用户福音:Chrome新工具让Safari迁移变得极简

谷歌正在开发一个全新的"Safari导入"工具&#xff0c;让iPhone用户从Safari切换到Chrome浏览器变得前所未有的简单。据The MacObserver报告&#xff0c;这项功能将彻底改变目前仅限于桌面端的复杂迁移流程。新工具工作原理与使用步骤这个新工具将基于iPhone系统设置中…

光束驱动AI计算实现超级计算机级性能

张量运算是一种支撑现代技术特别是人工智能的高级数学形式。这些运算远超人们日常遇到的简单计算。可以将其想象为同时在多个维度操控魔方&#xff0c;通过旋转、切片或重新排列各个层面。人类和传统计算机必须将这些任务分解为序列&#xff0c;但光可以同时执行所有操作。如今…

大数据毕设项目:基于django的城市房产价值的数据分析与预测系统的设计与实现(源码+文档,讲解、调试运行,定制等)

java毕业设计-基于springboot的(源码LW部署文档全bao远程调试代码讲解等) 博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、…

9个降AI率工具推荐!继续教育学生必看

9个降AI率工具推荐&#xff01;继续教育学生必看 AI降重工具&#xff1a;让论文更自然&#xff0c;更安心 在当前的学术环境中&#xff0c;越来越多的学生开始关注“**论文降AIGC率**”和“**去AI痕迹**”的问题。随着AI写作工具的普及&#xff0c;许多论文在内容上虽然逻辑清晰…

kali和centOS在用户创建相关方面命令的差别

kali和centOS在用户创建相关方面命令的差别kali和centOS在用户创建相关方面命令的差别问题根源useradd 是极简创建 在 Kali 这类系统里, useradd 只会在系统数据库里创建用户条目,默认不会自动创建主目录( /home/…

MagSafe星期一:Anker超薄Qi2电池包成为EDC装备完美搭档

个人而言&#xff0c;我很想购买iPhone 17 mini&#xff0c;但无论如何&#xff0c;Max机型对我的前口袋来说太大了&#xff0c;我更喜欢能够单手使用手机。然而&#xff0c;坚持使用较小电池容量的机型迫使我重新思考我的EDC装备。由于我不会拥有Pro Max的大容量电池&#xff…

1990:种下那棵不落叶的树-第5集:文件系统的名字

笔言: 故事大纲&#xff08;45集微故事版&#xff09; 核心设定 作品信息&#xff1a;书名《1990&#xff1a;种下那棵不落叶的树》&#xff0c;文艺副标题《代码山河》&#xff1b;核心意象&#xff1a;以“树”【Linux】为核心&#xff0c;根系对应内核、枝干对应子系统、叶…

百乐满热水器维修电话:深圳用户必看!深圳百乐满售后联系方式与专业服务指南 - 小白条111

百乐满热水器维修电话:深圳用户必看!深圳百乐满售后联系方式与专业服务指南Paloma 百乐满热水器售后维修(深圳)中心作为深圳区域指定授权机构(百乐满热水器售后维修(深圳)中心 24小时维修热线电话:4001166000)…

大数据计算机毕设之基于hadoop的山东瓜果蔬菜分析系统(完整前后端代码+说明文档+LW,调试定制等)

java毕业设计-基于springboot的(源码LW部署文档全bao远程调试代码讲解等) 博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、…

【计算机毕业设计案例】基于django大数据在直播带货商品选品中的应用(程序+文档+讲解+定制)

java毕业设计-基于springboot的(源码LW部署文档全bao远程调试代码讲解等) 博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、…

Wi-Fi信号检测心跳技术:非接触式心率监测新突破

加利福尼亚的一个研究团队提出了一项创新技术&#xff0c;利用环境Wi-Fi信号来监测人体心率。这项名为Pulse-Fi的新方法为现有心率监测方法提供了显著优势&#xff1a;成本低廉、易于部署&#xff0c;并且无需用户佩戴任何设备。 加州大学圣克鲁兹分校教授Katia Obraczka领导了…

2026年11项惊人工程技术突破即将到来

脑芯片帮助盲人重见光明埃隆马斯克表示&#xff0c;他的公司Neuralink计划在2026年帮助完全失明的患者恢复部分视力。该公司计划在今年早期对其最新、最强大的植入物Blindsight进行人体测试。该芯片将无线连接到外部摄像头&#xff0c;并植入大脑视觉皮层。通过绕过眼睛&#x…

大数据毕设选题推荐:基于django大数据在直播带货商品选品中的应用【附源码、mysql、文档、调试+代码讲解+全bao等】

java毕业设计-基于springboot的(源码LW部署文档全bao远程调试代码讲解等) 博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、…