spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

【README】

0,为啥要看  DefaultKafkaProducerFactory? 最近在基于 springboot 开发kafka模块,发现 kafakTemplate构造器传入了 DefaultKafkaProducerFactory实例, kafkaTemplate内部使用了 很多 DefaultKafkaProducerFactory的方法; 所以把 DefaultKafkaProducerFactory的重点方法分析处理出来,以便于查看 KafkaTemplate的内部逻辑; 

1, 本文涉及的 kafka操作,不涉及事务和消费者,所以本文忽略了有关kafka事务,消费者的描述; kafka事务, refer2  转:Kafka事务使用和编程示例/实例_PacosonSWJTU的博客-CSDN博客Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述​ Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败。注意:kafka事务和DB事务。在理解消息的事务时,一直处于一个错误理解是,把操作db的业务逻辑跟操作消息当成是一个事务,如下所示:void kakfa_in_tranction(){ // 1.kafa的操作:读取消息或生产消息 kafkaOperation(); /https://blog.csdn.net/PacosonSWJTU/article/details/1213058842,本文结合了 api doc 对 DefaultKafkaProducerFactory-默认kafka生产者工厂的重点方法进行介绍;  

3,DefaultKafkaProducerFactory 类代码结构包括(小结):

  1. 创建原生kafka生产者并包装到 CloseSafeProducer类中;
  2. 创建带有事务的 kafka 生产者;
  3. 为每个线程都创建一个kafka 生产者;
  4. 内部类 CloseSafeProducer-关闭安全生产者;
    1. 发送消息;
    2. 刷新kafka缓存到服务器;
    3. kafka事务操作(本文不涉及),包括开启或提交或中断事务,提交偏移量等; 
    4. 关闭生产者; 

【1】 类描述

类描述:

单例共享 Producer 实例的 ProducerFactory 实现。

此实现将为每次 createProducer() 调用时提供的 Map 配置和可选的 Serializer 实现返回相同的 Producer 实例(如果未启用事务)。

如果您使用的序列化器没有参数构造函数并且不需要设置,那么最简单的方法是在传递给 DefaultKafkaProducerFactory 构造函数的配置中针对 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG 和 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG 键指定序列化器类。

如果这是不可能的,但您确定以下至少一项是正确的:

  • 1 只有一个生产者会使用序列化程序。
  • 2 您正在使用可以在 Producer 实例之间共享的序列化程序(特别是它们的 close() 方法是无操作的)。
  • 3 您确定没有任何单个 Producer 被关闭的风险,而其他具有相同序列化程序的 Producer 实例正在使用中。

然后您可以为键和值序列化程序之一或两者传入 Serializer 实例。

如果以上都不是真的,那么您可以为一个或两个序列化程序提供一个供应商函数,每次工厂创建生产者时,该函数将用于获取序列化程序。

Producer 被包装,并且在调用 Producer.close() 时实际上并未关闭底层的 KafkaProducer 实例。当调用 DisposableBean.destroy() 或应用程序上下文发布 ContextStoppedEvent 时,KafkaProducer 物理关闭。你也可以调用reset()。

设置 setTransactionIdPrefix(String) 启用事务;在这种情况下,会维护生产者的缓存;关闭生产者将其返回到缓存。当工厂被销毁、应用程序上下文停止或调用 reset() 方法时,生产者将关闭并清除缓存。

public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactoryimplements ProducerFactory<K, V>, ApplicationContextAware,BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {

 【2】构造器

使用提供的配置和序列化程序供应商构建工厂。

如果提供,还可以把 transactionIdPrefix 配置为ProducerConfig.TRANSACTIONAL_ID_CONFIG 的值。

此配置将被目标 Producer 实例的后缀覆盖。

public DefaultKafkaProducerFactory(Map<String, Object> configs,@Nullable Supplier<Serializer<K>> keySerializerSupplier,@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {this.configs = new ConcurrentHashMap<>(configs);this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;// clientId 表示kafka生产者idif (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);}// 是否开启事务String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);if (StringUtils.hasText(txId)) {setTransactionIdPrefix(txId);this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);}this.configs.put("internal.auto.downgrade.txn.commit", true);
}

【特别注意】我们可以不传入 key序列化器,value序列化器 对象到 DefaultKafkaProducerFactory构造器(即设置为null) , 而把序列化器全限定类名设置在 configs 属性里面,因为原生kafka生产者的构造器可以读取配置中的序列化器类,如下:

new DefaultKafkaProducerFactory<>(0
(Map) PPKafkaClusterManager.INSTANCE.getKafkaClusterProps(kafkaClusterName));// 不传入key value的序列化器,默认为null 

 kafka 原生构造器如下:

// 通过反射创建序列化器对象 
public <T> T getConfiguredInstance(String key, Class<T> t) {Class<?> c = getClass(key);if (c == null)return null;Object o = Utils.newInstance(c);if (!t.isInstance(o))throw new KafkaException(c.getName() + " is not an instance of " + t.getName());if (o instanceof Configurable)((Configurable) o).configure(originals());return t.cast(o);}

【3】方法介绍

【3.1】设置物理关闭生产者超时时间

public void setPhysicalCloseTimeout(int physicalCloseTimeout) {this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout);
}

通过工厂物理关闭生产者而不是producer自身关闭的等待时间(当 {@link #reset()}、{@link #destroy() #closeProducerFor(String)} 或 {@link #closeThreadBoundProducer( )} 被调用)。 以秒为单位; 默认{@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}。


【3.2】设置事务id 前缀(开启事务)

public final void setTransactionIdPrefix(String transactionIdPrefix) {Assert.notNull(transactionIdPrefix, "'transactionIdPrefix' cannot be null");this.transactionIdPrefix = transactionIdPrefix;enableIdempotentBehaviour();
}
// 事务可用? 
@Override
public boolean transactionCapable() {return this.transactionIdPrefix != null;
}

为 ProducerConfig.TRANSACTIONAL_ID_CONFIG 配置设置前缀。 默认情况下,来自配置的 ProducerConfig.TRANSACTIONAL_ID_CONFIG 值用作目标生产者配置中的前缀。

【3.3】 设置是否每个线程产生一个生产者

public void setProducerPerThread(boolean producerPerThread) {this.producerPerThread = producerPerThread;
}

设置为 true 为每个线程创建一个生产者,而不是由所有客户端共享的单例。 当不再需要生产者时,客户端必须调用 closeThreadBoundProducer() 来物理关闭生产者。 这些生产者不会被 destroy() 或 reset() 关闭。 


【3.4】创建kafka生产者方法(重点)*

3个外观方法

@Override
public Producer<K, V> createProducer() {return createProducer(this.transactionIdPrefix);
}@Override
public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;return doCreateProducer(txIdPrefix);
}@Override
public Producer<K, V> createNonTransactionalProducer() {return doCreateProducer(null);
}

底层方法

private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {if (txIdPrefix != null) { // 使用kafka事务if (this.producerPerConsumerPartition) {return createTransactionalProducerForPartition(txIdPrefix);}else {return createTransactionalProducer(txIdPrefix);}}if (this.producerPerThread) { // 每个线程一个生产者 return getOrCreateThreadBoundProducer();}synchronized (this) { // 我们走这里,synchronized同步代码块 if (this.producer != null && expire(this.producer)) {this.producer = null;}if (this.producer == null) {this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,this.physicalCloseTimeout, this.beanName, this.epoch.get());this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));}return this.producer;}
}

【代码解说】 以上方法有4个分支,包括创建分区带事务的生产者, 带事务的生产者,每个线程一个生产者,普通生产者;

【3.4.1】 创建分区带事务的生产者 createTransactionalProducerForPartition(txIdPrefix)

因为方法过于复杂,放在文末说明

【3.4.2】 带事务的生产者 createTransactionalProducer(txIdPrefix)

因为方法过于复杂,放在文末说明

【3.4.3】 每个线程一个生产者 getOrCreateThreadBoundProducer

该方法就是把 生产者放入了线程级变量 ThreadLocal; 仅此而已

private Producer<K, V> getOrCreateThreadBoundProducer() {// 从线程级变量获取CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {closeThreadBoundProducer();tlProducer = null;}if (tlProducer == null) {tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,this.physicalCloseTimeout, this.beanName, this.epoch.get());for (Listener<K, V> listener : this.listeners) {listener.producerAdded(tlProducer.clientId, tlProducer);}this.threadBoundProducers.set(tlProducer); // 放入线程级遍历 }return tlProducer;
}

threadBoundProducers 就是线程级遍历


【3.4.4】 普通生产者*(本文所关注的方法)

synchronized块中 可以保证所有客户端线程复用同一个 kafka生产者,只要这个kafka没有过期;即便过期,它会重新创建一个 kafka生产者;

synchronized中调用了expire(this.producer)判断是否过期

private boolean expire(CloseSafeProducer<K, V> producer) {boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;if (expired) {producer.closeDelegate(this.physicalCloseTimeout, this.listeners);}return expired;
}

接着 创建了 CloseSafeProducer -关闭安全的生产者, 这是一个内部类

protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

在 调用CloseSafeProducer 构造器时传入了 kafka生产者, 由 createKafkaProducer() 获取; createKafkaProducer() 方法如下:

protected Producer<K, V> createKafkaProducer() {Map<String, Object> newConfigs;if (this.clientIdPrefix == null) {// 是否有生产者客户端id ,这个值可以为空,newConfigs = new HashMap<>(this.configs);}else {newConfigs = new HashMap<>(this.configs);newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());}checkBootstrap(newConfigs);return createRawProducer(newConfigs);
}// 创建原生kafka生产者 
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {Producer<K, V> kafkaProducer =new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());for (ProducerPostProcessor<K, V> pp : this.postProcessors) {kafkaProducer = pp.apply(kafkaProducer);}return kafkaProducer;
}

 子类必须返回一个原生生产者(kafka生产者),该生产者将被包装在 DefaultKafkaProducerFactory.CloseSafeProducer 中。


【4】内部类-保证关闭安全的kafka生产者-CloseSafeProducer

1, 该类实现了 接口  Producer;

【4.1】构造器

上文调用  CloseSafeProducer构造器创建 生产者对象,

 其中 removeProducer 使用了 java8语法的方法引用,如下:

构造器有1个外观

CloseSafeProducer(Producer<K, V> delegate,BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout,String factoryName, int epoch) {this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
}

底层构造器

CloseSafeProducer(Producer<K, V> delegate,BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,Duration closeTimeout, String factoryName, int epoch) {Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");this.delegate = delegate; // 原生kafka生产者 this.removeProducer = removeProducer; // 移除生产者的方法this.txIdPrefix = txIdPrefix; // 事务id前缀 this.closeTimeout = closeTimeout; // 关闭超时时间 Map<MetricName, ? extends Metric> metrics = delegate.metrics(); // 指标Iterator<MetricName> metricIterator = metrics.keySet().iterator();// 指标迭代器String id;if (metricIterator.hasNext()) {id = metricIterator.next().tags().get("client-id");}else {id = "unknown";}this.clientId = factoryName + "." + id; // 客户端idthis.created = System.currentTimeMillis(); // 创建时间为当前时间(毫秒)this.epoch = epoch; // 副本时代版本号LOGGER.debug(() -> "Created new Producer: " + this);
}

【代码解说】

delegate,表示代理,这里指 原生kafka生产者;

很显然, 构造器没有啥复杂逻辑,就是赋值而已;


【4.2】发送消息

发送消息有2个重载方法 

方法1, 直接发送 ProducerRecord

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {LOGGER.trace(() -> toString() + " send(" + record + ")");return this.delegate.send(record);
}

方法2,带回调的发送方法,消息发送完成后的回调(成功或失败),回调方法中可以获取发送消息所在的偏移量和分区等信息(metadata) ;

这个方法先执行了 原生kafka发送消息分发的回调, 回调执行完成后,在执行 外层传入的回调;

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {LOGGER.trace(() -> toString() + " send(" + record + ")");return this.delegate.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception instanceof OutOfOrderSequenceException) {CloseSafeProducer.this.producerFailed = exception;close(CloseSafeProducer.this.closeTimeout);}callback.onCompletion(metadata, exception);}});
}

【4.3】flush 刷新缓冲区

直接调用 原生kakfa生产者delegate的flush 方法; 

public void flush() {LOGGER.trace(() -> toString() + " flush()");this.delegate.flush();
}

【4.4】kafka事务操作方法

初始化事务, initTransactions;

开启事务, beginTransaction ;

发送偏移量到事务, sendOffsetsToTransaction ;

提交事务, commitTransaction ;

中断事务, abortTransaction ;


【4.5】关闭生产者方法(非常重要*)

有2个方法关闭生产者;

【4.5.1】 close(Duration timeout )

public void close(@Nullable Duration timeout) {LOGGER.trace(() -> toString() + " close(" + (timeout == null ? "null" : timeout) + ")");if (!this.closed) {if (this.producerFailed != null) {LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + this);this.closed = true;this.removeProducer.test(this, this.producerFailed instanceof TimeoutException? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT: timeout);}else {this.closed = this.removeProducer.test(this, timeout);}}
}

调用 remoteProducer() 移除生产者

// 删除单个共享生产者和线程绑定实例(如果存在)。
protected final synchronized boolean removeProducer(CloseSafeProducer<K, V> producerToRemove
, Duration timeout) {if (producerToRemove.closed) {if (producerToRemove.equals(this.producer)) {this.producer = null;producerToRemove.closeDelegate(timeout, this.listeners);}this.threadBoundProducers.remove();return true;}else {return false;}
}

 

也就是说,不是每一次发送消息完成,都关闭kafka生产者

  1. 那什么时候才会关闭 ?
  2. 当 producerToRemove.closed 为true时,才会执行关闭;那 producerToRemove.closed 即代理.closed 何时才会设置为true ?
  3. 当 this.profucerFailed 不为空的时候; 那 this.profucerFailed  什么情况下不为空 ?
  4. 原来在 发送消息的回调方法里,若发送消息抛出异常,则设置 profucerFailed为异常对象,即不为空;如下图所示

【总结】 何时关闭kafka生产者 ?(干货——非常重要) *

当发送消息抛出异常时,关闭kafka生产者; 

上述代码还是调用了  producerToRemove.closeDelegate(timeout, this.listeners);

其中 producerToRemove 就是 producer(CloseSafeProducer )的 closeDelegate(),下文所述;


 【4.5.2】 CloseSafeProducer.closeDelegate

调用了 原生kafka生产者的close 方法;

把生产者从监听器中移除;

void closeDelegate(Duration timeout, List<Listener<K, V>> listeners) {this.delegate.close(timeout == null ? this.closeTimeout : timeout);listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));this.closed = true;
}


【补充】创建生产者

【代码解说】 以上方法有4个分支,包括创建分区带事务的生产者, 带事务的生产者,每个线程一个生产者,普通生产者;

【3.4.1】 创建分区带事务的生产者 createTransactionalProducerForPartition(txIdPrefix)

protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {String suffix = TransactionSupport.getTransactionIdSuffix();if (suffix == null) {return createTransactionalProducer(txIdPrefix);}else {synchronized (this.consumerProducers) {CloseSafeProducer<K, V> consumerProducer = this.consumerProducers.get(suffix);if (consumerProducer == null || expire(consumerProducer)) {CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,this::removeConsumerProducer);this.consumerProducers.put(suffix, newProducer);return newProducer;}else {return consumerProducer;}}}
}
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
  • 1) 如果后缀为空,调用  createTransactionalProducer 创建生产者 ; 先从缓存中的阻塞队列中获取,若获取不到,则 调用 doCreateTxProducer  创建;

获取缓存 getCache() 方法如下:
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {if (txIdPrefix == null) {return null;}return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());
}// cache缓存定义
private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache= new ConcurrentHashMap<>();
  •  2,不为空, 调用 doCreateTxProducer 创建生产者; 它也调用了  createRawProducer 根据原生 kafka生产者 创建  CloseSafeProducer

【3.4.2】 带事务的生产者 createTransactionalProducer(txIdPrefix)

refer2【3.4.1】分支1。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329816.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SpringSecurity】【JJWT】JJWT踩坑LocalDateTime

前言 最近自己又在开始闲搞&#xff0c;主要原因还是下山无望&#xff08;买显卡&#xff09;。只能晚上下班找点事情做啦~~ 环境 版本请根据实际情况参考JJWT官网选择使用&#xff0c;这里只说明一下问题大概思路&#xff01; <!-- 增加token生成依赖 --> <depen…

针对Linux ASP.NET MVC网站中 httpHandlers配置无效的解决方案

近期有Linux ASP.NET用户反映&#xff0c;在MVC网站的Web.config中添加 httpHandlers 配置用于处理自定义类型&#xff0c;但是在运行中并没有产生预期的效果&#xff0c;服务器返回了404&#xff08;找不到网页&#xff09;错误。经我亲自测试&#xff0c;在WebForm网站中&…

简单介绍Java中Comparable和Comparator

转载自 简单介绍Java中Comparable和ComparatorComparable 和 Comparator是Java核心API提供的两个接口&#xff0c;从它们的名字中&#xff0c;我们大致可以猜到它们用来做对象之间的比较的。但它们到底怎么用&#xff0c;它们之间有又哪些差别呢&#xff1f;下面有两个例子可以…

spring-kafka整合:KafkaTemplate-kafka模板类介绍

【README】 1&#xff0c;本文主要关注 KafkaTemplate的重点方法&#xff0c;并非全部方法&#xff1b; 2&#xff0c;KafkaTemplate 底层依赖于 DefaultKafkaProducerFactory &#xff0c; 关于 DefaultKafkaProducerFactory 的介绍&#xff0c;refer2 spring-kafka整合:…

安卓 on a null object reference_详解Object.prototype.__proto__

Object.prototype 的 __proto__ 属性是一个访问器属性(一个getter函数和一个setter函数), 暴露了通过它访问的对象的内部[[Prototype]] (一个对象或 null)。使用__proto__是有争议的&#xff0c;也不鼓励使用它。因为它从来没有被包括在EcmaScript语言规范中&#xff0c;但是现…

【SpringBoot】服务器JVM远程调试

目的 当系统部署到测试环境服务器时&#xff0c;难免会遇到bug。这个时候如果能远程调试&#xff0c;那么能够大大提高我们的生产效率&#xff0c;快速完成服务调试&#xff0c;最快发布生产环境。&#xff08;领导好评不就到手了&#xff09; 准备 Idea&#xff08;Java最好…

图说:为什么Java中的字符串被定义为不可变的

转载自 图说&#xff1a;为什么Java中的字符串被定义为不可变的字符串&#xff0c;想必大家最熟悉不过了&#xff0c;通常我们在代码中有几种方式可以创建字符串&#xff0c;比如&#xff1a;String s "Hollis";这时&#xff0c;其实会在堆内存中创建一个字符串对象…

值得推荐的微软技术公众号推荐

为开阔技术人眼界&#xff0c;促进技术人职业成长。小二在此诚意推荐最值得关注的微软技术公众号。平时关注推送的文章或多或少要么学到知识技能&#xff0c;要么收到一些启发&#xff0c;有利于个人成长。这些公众号为笔者个人积累&#xff0c;不一定都合大家口味&#xff0c;…

springboot:BeanPostProcessor示例及分析

【README】 1&#xff0c;本文主要分析 BeanPostProcessor 的作用&#xff0c; 开发方式&#xff1b; 2&#xff0c;BeanPostProcessor 是bean后置处理器&#xff0c; 简而言之就是bean被创建好了&#xff0c;之后如果需要对其属性进行修改&#xff0c;则 需要使用 BeanPost…

实现了某一个接口的匿名类的例子_java中的内部类内部接口详解,一文搞定

简介一般来说&#xff0c;我们创建类和接口的时候都是一个类一个文件&#xff0c;一个接口一个文件&#xff0c;但有时候为了方便或者某些特殊的原因&#xff0c;java并不介意在一个文件中写多个类和多个接口&#xff0c;这就有了我们今天要讲的内部类和内部接口。内部类先讲内…

Java 8 日期和时间解读

转载自 Java 8 日期和时间解读现在&#xff0c;一些应用程序仍然在使用java.util.Date和java.util.Calendar API和它们的类库&#xff0c;来使我们在生活中更加轻松的处理日期和时间&#xff0c;比如&#xff1a;JodaTime。然而&#xff0c;Java 8 引进的新的类库来处理日期和时…

云计算产值将超3000亿美元 亚马逊微软谷歌居三甲

腾讯科技讯 3月27日消息&#xff0c;据外电报道&#xff0c;云计算曾经主要是无法承担建造和维护基础设施的初创公司的解决方案&#xff0c;但对于管理数字业务的大型企业而言&#xff0c;云计算正快速成为省钱的管理数字业务的方式。市场调研公司IDC在上月的一份调查数据显示&…

oracle中join另一个表后会查询不出一些数据_面试必备 | 8个Hive数据仓工具面试题锦集!...

是新朋友吗&#xff1f;记得先点蓝字关注我哦&#xff5e;今日课程菜单Java全栈开发 | Web前端H5大数据开发 | 数据分析人工智能Python | 人工智能物联网进入数据时代&#xff0c;大数据技术成为互联网发展的核心要素之一。与此同时大数据开发工程师的薪资也成为行业内高薪的代…

springboot-Initializer例子及分析

【README】 1&#xff0c;本文主要编写了 初始化器例子并分析了其调用路径&#xff1b; 2&#xff0c;初始化器的执行顺序 先于 后置处理器&#xff1b; 后置处理器&#xff0c;refer2 springboot&#xff1a;BeanPostProcessor示例及分析_PacosonSWJTU的博客-CSDN博客【RE…

ASP.NET 开发人员不必担心 Node 的五大理由

哦别误会……我真的很喜欢 Node&#xff0c;而且我觉得它提出的概念和模式将在很长一段时间内&#xff0c;对服务端 Web 编程产生深远的影响。即使随着时间的推移 Node 过气了&#xff0c;我们肯定可以从下一个牛逼玩意身上或多或少的感觉到它的影响(不管好的和/或坏的)。而在这…

Spring Boot面试题

转载自 Spring Boot面试题 Spring Boot 是微服务中最好的 Java 框架. 我们建议你能够成为一名 Spring Boot 的专家。问题一 Spring Boot、Spring MVC 和 Spring 有什么区别&#xff1f; SpringFrameSpringFramework 最重要的特征是依赖注入。所有 SpringModules 不是依赖注入就…

synchronized原理_Java并发编程 -- synchronized保证线程安全的原理

线程安全是并发编程中的重要关注点&#xff0c;应该注意到的是&#xff0c;造成线程安全问题的主要诱因有两点&#xff0c;一是存在共享数据(也称临界资源)&#xff0c;二是存在多条线程共同操作共享数据。因此为了解决这个问题&#xff0c;我们可能需要这样一个方案&#xff0…

转:Java 7 种阻塞队列详解

转自&#xff1a; Java 7 种阻塞队列详解 - 云社区 - 腾讯云队列&#xff08;Queue&#xff09;是一种经常使用的集合。Queue 实际上是实现了一个先进先出&#xff08;FIFO&#xff1a;First In First Out&#xff09;的有序表。和 List、Set ...https://cloud.tencent.com/de…

微软Build 2016前瞻:让开发者编写能畅行所有设备的app

本周三&#xff0c;5000名软件开发者将齐聚旧金山莫斯康展览中心参加微软公司年度开发者大会&#xff08;Build 2016&#xff09;&#xff0c;和往年一样&#xff0c;微软在大会上发布了一系列新的技术支持。 据透露&#xff0c;微软将会让开发人员编写可以在任何Windows设备上…

XSS的那些事儿

转载自 XSS的那些事儿XSS是什么XSS&#xff0c;Cross-site scripting&#xff0c;跨站脚本攻击&#xff0c;为了区分与CSS&#xff0c;起名为XSS。黑客利用网站的漏洞&#xff0c;通过代码注入的方式将一些包含了恶意攻击脚本程序注入到网页中&#xff0c;企图在用户加载网页时…