网站建设费 税前扣除吗百度知道合伙人官网

web/2025/9/26 3:45:00/文章来源:
网站建设费 税前扣除吗,百度知道合伙人官网,百度查重入口免费版,2015网站排名文章目录 概要初始化消息发送小结 概要 本文主要概括Spring Kafka生产者发送消息的主流程 代码准备#xff1a; SpringBoot项目中maven填加以下依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent SpringBoot项目中maven填加以下依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.12.RELEASE/versionrelativePath/ !-- lookup parent from repository -- /parent dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency消息发送使用KafkaTemplate Autowired private KafkaTemplateString, String kafkaTemplate;GetMapping(/test/send/{msg}) public String sendMsg(PathVariable String msg) {kafkaTemplate.send(alai_test, msg);return success; }初始化 启动类KafkaAutoConfiguration 有两个地方需要关注 Bean ConditionalOnMissingBean({KafkaTemplate.class}) public KafkaTemplate?, ? kafkaTemplate(ProducerFactoryObject, Object kafkaProducerFactory, ProducerListenerObject, Object kafkaProducerListener) {KafkaTemplateObject, Object kafkaTemplate new KafkaTemplate(kafkaProducerFactory);if (this.messageConverter ! null) {kafkaTemplate.setMessageConverter(this.messageConverter);}kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate; }Bean ConditionalOnMissingBean({ProducerFactory.class}) public ProducerFactory?, ? kafkaProducerFactory() {DefaultKafkaProducerFactory?, ? factory new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());String transactionIdPrefix this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix ! null) {factory.setTransactionIdPrefix(transactionIdPrefix);}return factory; }其中的ProducerFactory使用的是DefaultKafkaProducerFactory 在发送消息之前Spring Kafka会先创建Producer,返回的是CloseSafeProducer实现类在该类中有一个委托类ProducerK, V delegate真正的发送消息处理逻辑委托给KafkaProducerKafkaProducer实例构造如下边幅原因这里只展示需要说明的部分 KafkaProducer(MapString, Object configs,SerializerK keySerializer,SerializerV valueSerializer,ProducerMetadata metadata,KafkaClient kafkaClient,ProducerInterceptorsK, V interceptors,Time time) {ProducerConfig config new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,valueSerializer));try {MapString, Object userProvidedConfigs config.originals();this.producerConfig config;this.time time;String transactionalId userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;this.clientId config.getString(ProducerConfig.CLIENT_ID_CONFIG);LogContext logContext;if (transactionalId null)logContext new LogContext(String.format([Producer clientId%s] , clientId));elselogContext new LogContext(String.format([Producer clientId%s, transactionalId%s] , clientId, transactionalId));log logContext.logger(KafkaProducer.class);log.trace(Starting the Kafka producer);MapString, String metricTags Collections.singletonMap(client-id, clientId);MetricConfig metricConfig new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)).timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))).tags(metricTags);ListMetricsReporter reporters config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,MetricsReporter.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));reporters.add(new JmxReporter(JMX_PREFIX));this.metrics new Metrics(metricConfig, reporters, time);this.partitioner config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);long retryBackoffMs config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);if (keySerializer null) {this.keySerializer config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);this.keySerializer.configure(config.originals(), true);} else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);this.keySerializer keySerializer;}if (valueSerializer null) {this.valueSerializer config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serializer.class);this.valueSerializer.configure(config.originals(), false);} else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);this.valueSerializer valueSerializer;}// load interceptors and make sure they get clientIduserProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);ProducerConfig configWithClientId new ProducerConfig(userProvidedConfigs, false);ListProducerInterceptorK, V interceptorList (List) configWithClientId.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);// 生产者拦截器 if (interceptors ! null)this.interceptors interceptors;elsethis.interceptors new ProducerInterceptors(interceptorList);ClusterResourceListeners clusterResourceListeners configureClusterResourceListeners(keySerializer,valueSerializer, interceptorList, reporters);// 生产者往服务端发送消息的时候规定一条消息最大多大// 如果你超过了这个规定消息的大小你的消息就不能发送过去。// 默认是1M这个值偏小在生产环境中我们需要修改这个值。// 经验值是10M。但是大家也可以根据自己公司的情况来。 this.maxRequestSize config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);//指的是缓存大小//默认值是32M这个值一般是够用如果有特殊情况的时候我们可以去修改这个值。this.totalMemorySize config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);// kafka是支持压缩数据的可以设置压缩格式,默认是不压缩支持gzip、snappy、lz4// 一次发送出去的消息就更多。生产者这儿会消耗更多的cpu.this.compressionType CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));// 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多长时间,由于缓冲区已满或元数据不可用这些方法可// 能会被阻塞止this.maxBlockTimeMs config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);int deliveryTimeoutMs configureDeliveryTimeout(config, log);this.apiVersions new ApiVersions();this.transactionManager configureTransactionState(config, logContext);// 创建核心组件记录累加器this.accumulator new RecordAccumulator(logContext,config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));ListInetSocketAddress addresses ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));if (metadata ! null) {this.metadata metadata;} else {// 生产者每隔一段时间都要去更新一下集群的元数据,默认5分钟this.metadata new ProducerMetadata(retryBackoffMs,config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),logContext,clusterResourceListeners,Time.SYSTEM);this.metadata.bootstrap(addresses);}this.errors this.metrics.sensor(errors);// 真正执行消息发送的逻辑this.sender newSender(logContext, kafkaClient, this.metadata);String ioThreadName NETWORK_THREAD_PREFIX | clientId;this.ioThread new KafkaThread(ioThreadName, this.sender, true);// 开启新的线程this.ioThread.start();config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug(Kafka producer started);} catch (Throwable t) {// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121close(Duration.ofMillis(0), true);// now propagate the exceptionthrow new KafkaException(Failed to construct kafka producer, t);}}创建Sender时的方法如下 // visible for testing Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {// 使用幂等性需要将 enable.idempotence 配置项设置为true。并且它对单个分区的发送一次性最多发送5条int maxInflightRequests producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);// 控制客户端等待请求响应的最长时间。如果在超时过去之前未收到响应客户端将// 在必要时重新发送请求或者如果重试耗尽请求失败int requestTimeoutMs producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);ChannelBuilder channelBuilder ClientUtils.createChannelBuilder(producerConfig, time, logContext);ProducerMetrics metricsRegistry new ProducerMetrics(this.metrics);Sensor throttleTimeSensor Sender.throttleTimeSensor(metricsRegistry.senderMetrics);// 初始化了一个重要的管理网路的组件 // connections.max.idle.ms: 默认值是9分钟, 一个网络连接最多空闲多久超过这个空闲时间就关闭这个网络连接。 // max.in.flight.requests.per.connection默认是5, producer向broker发送数据的时候其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数KafkaClient client kafkaClient ! null ? kafkaClient : new NetworkClient(new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),this.metrics, time, producer, channelBuilder, logContext),metadata,clientId,maxInflightRequests,producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),requestTimeoutMs,producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),time,true,apiVersions,throttleTimeSensor,logContext);short acks Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));return new Sender(logContext,client,metadata,this.accumulator,maxInflightRequests 1,producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),acks,producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), // 重试次数metricsRegistry.senderMetrics,time,requestTimeoutMs,producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),this.transactionManager,apiVersions); }在创建RecordAccumulator时其内部会维护一个ConcurrentMapTopicPartition, DequeProducerBatch batches 该Map的key是TopicPartition这个类重写了equals方法相同的topic相同的分区在batches中属于相同的key,就会被放入到队列Deque中。 消息发送 Spring Kafka对消息的发送最后也是直接委托给了org.apache.kafka.clients.producer.KafkaProducer#doSend方法下面以这个方法作为入口进行分析 Override public FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptionsProducerRecordK, V interceptedRecord this.interceptors.onSend(record);return doSend(interceptedRecord, callback); }拦截器 onSend 方法是遍历拦截器 onSend 方法拦截器的目的是将数据处理加工 kafka 本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能必须自己实现 ProducerInterceptor 接口 public ProducerRecordK, V onSend(ProducerRecordK, V record) {ProducerRecordK, V interceptRecord record;for (ProducerInterceptorK, V interceptor : this.interceptors) {try {interceptRecord interceptor.onSend(interceptRecord);} catch (Exception e) {// 其中一个拦截器出现处理异常时不回抛出异常只会打印日志// do not propagate interceptor exception, log and continue calling other interceptors// be careful not to throw exception from hereif (record ! null)log.warn(Error executing interceptor onSend callback for topic: {}, partition: {}, record.topic(), record.partition(), e);elselog.warn(Error executing interceptor onSend callback, e);}}return interceptRecord; }ProducerInterceptor的3个方法 onSend: Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作但最好保证不要修改消息所属的topic和分区否则会影响目标分区的计算onAcknowledgement: 该方法会在消息被应答之前或消息发送失败时调用并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中因此不要在该方法中放入很重的逻辑否则会拖慢producer的消息发送效率close: 关闭interceptor主要用于执行一些资源清理工作 消息发送主流程 /*** Implementation of asynchronously send a record to a topic.*/ private FutureRecordMetadata doSend(ProducerRecordK, V record, Callback callback) {TopicPartition tp null;try {throwIfProducerClosed();// first make sure the metadata for the topic is availablelong nowMs time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {// 首先确保该topic的元数据可用clusterAndWaitTime waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException(Producer closed while send in progress, e);throw e;}nowMs clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster clusterAndWaitTime.cluster;// 序列化 record 的 key 和 valuebyte[] serializedKey;try {serializedKey keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException(Cant convert key of class record.key().getClass().getName() to class producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() specified in key.serializer, cce);}byte[] serializedValue;try {serializedValue valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException(Cant convert value of class record.value().getClass().getName() to class producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() specified in value.serializer, cce);}// 获取该 record 要发送到的 partitionint partition partition(record, serializedKey, serializedValue, cluster);tp new TopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers record.headers().toArray();int serializedSize AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),compressionType, serializedKey, serializedValue, headers);ensureValidRecordSize(serializedSize);long timestamp record.timestamp() null ? nowMs : record.timestamp();if (log.isTraceEnabled()) {log.trace(Attempting to append record {} with callback {} to topic {} partition {}, record, callback, record.topic(), partition);}// producer callback will make sure to call both callback and interceptor callbackCallback interceptCallback new InterceptorCallback(callback, this.interceptors, tp);// 向 accumulator 中追加 record 数据数据会先进行缓存RecordAccumulator.RecordAppendResult result accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);if (result.abortForNewBatch) {int prevPartition partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition partition(record, serializedKey, serializedValue, cluster);tp new TopicPartition(record.topic(), partition);if (log.isTraceEnabled()) {log.trace(Retrying append due to new batch creation for topic {} partition {}. The old partition was {}, record.topic(), partition, prevPartition);}// producer callback will make sure to call both callback and interceptor callbackinterceptCallback new InterceptorCallback(callback, this.interceptors, tp);result accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (transactionManager ! null) {transactionManager.maybeAddPartition(tp);}// 如果追加完数据后对应的 RecordBatch 已经达到了 batch.size 的大小或者batch 的剩余空间不足以添加下一条 Record则唤醒 sender 线程发送数据。if (result.batchIsFull || result.newBatchCreated) {log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), partition);this.sender.wakeup();}return result.future;} catch (ApiException e) {...}... }Producer 通过 waitOnMetadata() 方法来获取对应 topic 的 metadata 信息需要先该topic 是可用的 Producer 端对 record 的 key 和 value 值进行序列化操作在 Consumer 端再进行相应的反序列化 获取partition值具体分为下面三种情况 1 指明 partition 的情况下直接将指明的值直接作为 partiton 值 2 没有指明 partition 值但有 key 的情况下将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值 3 既没有 partition 值又没有 key 值的情况下第一次调用时随机生成一个整数后面每次调用在这个整数上自增将这个值与 topic 可用的 partition 总数取余得到partition 值也就是常说的 round-robin 算法 4 Producer 默认使用的 partitioner 是org.apache.kafka.clients.producer.internals.DefaultPartitioner 向 accumulator 写数据先将 record 写入到 buffer 中当达到一个 batch.size 的大小时再唤起 sender线程去发送 RecordBatch这里仔细分析一下Producer是如何向buffer写入数据的 1.获取该 topic-partition 对应的 queue没有的话会创建一个空的 queue 2.向 queue 中追加数据先获取 queue 中最新加入的那个 RecordBatch如果不存在或者存在但剩余空余不足以添加本条 record 则返回 null成功写入的话直接返回结果写入成功 3.创建一个新的 RecordBatch初始化内存大小根据 max(batch.size,Records.LOG_OVERHEAD Record.recordSize(key, value)) 来确定防止单条record 过大的情况 4.向新建的 RecordBatch 写入 record并将 RecordBatch 添加到 queue 中返回结果写入成功 发送 RecordBatch当 record 写入成功后如果发现 RecordBatch 已满足发送的条件通常是 queue 中有多个 batch那么最先添加的那些 batch 肯定是可以发送了那么就会唤醒sender 线程发送 RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() 方法中进行的该方法具体实现如下 1.获取那些已经可以发送的 RecordBatch 对应的 nodes 2.如果与node 没有连接如果可以连接,同时初始化该连接,就证明该 node 暂时不能发送数据,暂时移除该 node 3.返回该 node 对应的所有可以发送的 RecordBatch 组成的 batcheskey 是 node.id,并将 RecordBatch 从对应的 queue 中移除 4.将由于元数据不可用而导致发送超时的 RecordBatch 移除 5.发送 RecordBatch 小结 由上图可以看出KafkaProducer有两个基本线程 主线程 负责消息创建拦截器序列化器分区器等操作并将消息追加到消息收集器RecordAccumulator中RecordAccumulator为每个分区都维护了一个Deque 类型的双端队列。ProducerBatch可以理解为是ProducerRecord 的集合批量发送有利于提升吞吐量降低网络影响由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存并维护了一个 BufferPool 实现 ByteBuffer 的复用该缓存池只针对特定大小 batch.size指定的 ByteBuffer进行管理对于消息过大的缓存不能做到重复利用。每次追加一条ProducerRecord消息会寻找/新建对应的双端队列从其尾部获取一个ProducerBatch判断当前消息的大小是否可以写入该批次中。若可以写入则写入若不可以写入则新建一个ProducerBatch判断该消息大小是否超过客户端参数配置 batch.size 的值不超过则以 batch.size建立新的ProducerBatch这样方便进行缓存重复利用若超过则以计算的消息大小建立对应的 ProducerBatch 缺点就是该内存不能被复用了。 Sender线程 该线程从消息收集器获取缓存的消息将其处理为 Node, ListProducerBatch 的形式 Node 表示集群的broker节点。进一步将Node, ListProducerBatch转化为Node, Request形式此时才可以向服务端发送数据。在发送之前Sender线程将消息以 MapNodeId, DequeRequest 的形式保存到InFlightRequests 中进行缓存可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个以实现消息的尽快发出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/81986.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

可信网站多少钱做网站每天更新两篇文章

全表扫描的工作是扫描高水位一下所有的数据块。 这里就有一个问题,什么是高水位线。高水位的标志存在表头。 该数据块以后都是崭新未格式化的数据块,高水位的目的有二。它是全表扫描的 终点,并行插入的起点! 优化全表扫描的办法有…

个人信息网站建设的心得体会网站制作模板软件

Text to speech tool in LinuxeSpeak是一款 Linux 命令行工具,能把文本转换成语音。它是一款简洁的语音合成器,用C语言编写而成,它支持英语和其它多种语言。eSpeak 从标准输入或者输入文件中读取文本。虽然语音输出与真人声音相去甚远。但是&…

教育网站建设改版河北省住房建设厅网站首页

eai app课程大纲 Spring Integration是用于企业应用程序集成的开源框架。 这是一个轻量级的框架,建立在核心Spring框架之上。 它旨在支持开发事件驱动的体系结构和以消息为中心的体系结构典型的集成解决方案。 Spring Integration扩展了Spring编程模型,…

滕州本地网站建设wordpress 增加站长统计

这学期选了门模式识别的课。发现最常见的一种情况就是,书上写的老师ppt上写的都看不懂,然后绕了一大圈去自己查资料理解,回头看看发现,Ah-ha,原来本质的原理那么简单,自己一開始仅仅只是被那些看似formidab…

任县网站建设gif动图素材网站

【作品展示】windows11系统 操作:点击小绿旗进入windows11主页面,不仅是能打开浏览器,还可以进行背景切换等功能。

网站app的意义一个服务器可以备案几个网站

除了 RDKit,还有几个其他的Python库可以用来将SMILES(Simplified Molecular Input Line Entry System)字符串转换为分子图片。这些库通常用于化学信息学和分子建模领域。一些常见的选项包括: rdkit Open Babel Open Babel 命令行…

四川省网站建设广告设计与制作工作内容

PgSQL技术内幕-Bitmap Index Scan 1、简介 Bitmap索引扫描是对索引扫描的一个优化,通过建立位图的方式将原来的随机堆表访问转换成顺序堆表访问。主要分为两点:1)管理每个Bitmap的hash slot没用完时,每个Bitmap代表每个heap页中满…

长沙网站建设的首选免费logo设计一键生成无水印

使用Kafka Streams开发流处理应用 一、简介1.1 Kafka Streams定义1.2 Kafka Streams的优势1.3 Kafka Streams应用场景 二、环境搭建2.1 安装Kafka2.2 安装Kafka Streams2.3 构建Kafka集群 三、Kafka Streams编程API介绍3.1 Kafka Streams主要API3.2 应用程序的配置和参数3.3 To…

网站分析内容广告设计工资高吗

开发环境 ubuntu 20.04 VTK 8.2 编译VTK 下载源码 git clone --recursive https://gitlab.kitware.com/vtk/vtk.git 使用版本管理工具,切换版本到8.2 更改编译选项,这里使用cmake-gui进行配置 1、编译类型修改为Release 2、安装路径可以设置&#xf…

北京网站建设公司华网天下优惠做网站系统具体步骤

产品说明 一、 hook版本:企业微信hook接口是指将企业微信的功能封装成dll,并提供简易的接口给程序调用。通过hook技术,可以在不修改企业微信客户端源代码的情况下,实现对企业微信客户端的功能进行扩展和定制化。企业微信hook接口…

个人网站毕业设计作品电子商务网站建设方案范文

1.安装之前的操作 ps -ef|grep nginx 查看是否有运行 如果有就杀掉 kill -9 pid find / -name nginx 查看nginx文件 rm -rf file /usr/local/nginx* 通通删掉删掉 yum remove nginx 限载一下服务 1.2.下载安装包 地址 nginx: download 2.减压文件 tar…

武钢建工集团建设公司网站网站网页不对称

1. (其它) 掌握Linux下DHCP、DNS、Apache、FTP服务器的安装和配置,在Linux服务器上部署JavaWeb应用 完成单元八的实训内容。 1、安装 JDK 2、安装 MySQL 3、部署JavaWeb应用 安装jdk 教程连接:linux安装jdk8详细步骤-CSDN博客 Jdk来源:linu…

湖北专业网站建设质量保障wordpress category 参数

更相减损法和辗转相除法(GCD)求最小公倍数和最大公约数 标签(空格分隔): 算法 算法竞赛 这两种算法平时经常听到,听起来也很装逼,但是我老是忘了他们的原理,今天好好想想&#xff0c…

品牌网线seo的描述正确

转载自:http://www.cnblogs.com/chenxizhang/p/3280947.html 这一篇文章来谈谈对于WPF应用程序开发中的未捕获异常的处理。 首先,我们当然是要求应用程序开发人员,尽可能地在程序可能出现异常的地方都去捕捉异常,使用try…catch的…

成都网站开发团队旅游建设门户网站的方案

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、opengles3.0 绘制图元介绍二、绘图图元 API 介绍1. glDrawArrays()1.1 glDrawArrays()函数原型1.2 GL_TRIANGLES, GL_TRIANGLE_STRIP, GL_TRIANGLE_FAN 三者的区别1.3 使用GL_TRIANGLES, G…

微信服务号可以做万网站么网站外的seo

modbus 协议 数据格式00 00 00 00 00 06 01 03 00 00 00 02 ascii码第一位:事务处理标识符 由服务器复制,通常为0第二位:事务处理标识符 由服务器复制,通常为0第三位:协议标识符0第四位:协议标识符0第五…

飞色 网站建设H5建网站

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《LInux实战开发》。🎯🎯 …

网站开发例子永嘉规划建设局网站

以太网通讯是一种被广泛使用的数据通讯方式。在嵌入式应用中也经常使用,但协议栈的实现并不是一件容易的事。不过有些以太网控制器就带有协议栈,如W5500。在本篇中我们将讨论如何设计并实现W5500以太网控制器的驱动。 1、功能概述 W5500是WIZnet开发的…

襄阳住房和城乡建设网站亚马逊品牌网站要怎么做

MySQL 部分 1. 查看是否开启 binlog MySQL 8 默认开启 binlog。可以通过以下命令查看是否开启: SHOW VARIABLES LIKE log_bin;如果返回结果为 ON,则表示 binlog 已开启。 Variable_nameValuelog_binON 2. 若未开启 binlog,则需手动配置 …

网页建设网站代码网站备案ip查询网站查询

flask篇之URL重定向(二十三) 通过url_for()函数构造动态的URL: 我们在flask之中不仅仅是可以匹配静态的URL,还可以通过url_for()这个函数构造动态的URL from flask import Flask from flask import url_forapp Flask(__name__)app.route(/) def inde…