Apache-Flink深度解析-DataStream-Connectors之Kafka


Kafka 简介

Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,LinkedIn于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka用于构建实时数据管道和流式应用程序。它具有水平扩展性、容错性、极快的速度,目前也得到了广泛的应用。

Kafka不但是分布式消息系统而且也支持流式计算,所以在介绍Kafka在Apache Flink中的应用之前,先以一个Kafka的简单示例直观了解什么是Kafka。

安装

本篇不是系统的,详尽的介绍Kafka,而是想让大家直观认识Kafka,以便在Apahe Flink中进行很好的应用,所以我们以最简单的方式安装Kafka。

  • 下载二进制包

curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz复制代码
  • 解压安装
    Kafka安装只需要将下载的tgz解压即可,如下:

jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
LICENSE        NOTICE        bin        config        libs        site-docs
复制代码

其中bin包含了所有Kafka的管理命令,如接下来我们要启动的Kafka的Server。

  • 启动Kafka Server
    Kafka是一个发布订阅系统,消息订阅首先要有个服务存在。我们启动一个Kafka Server 实例。 Kafka需要使用ZooKeeper,要进行投产部署我们需要安装ZooKeeper集群,这不在本篇的介绍范围内,所以我们利用Kafka提供的脚本,安装一个只有一个节点的ZooKeeper实例。如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
....
....
[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)复制代码

启动之后,ZooKeeper会绑定2181端口(默认)。接下来我们启动Kafka Server,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)复制代码

如果上面一切顺利,Kafka的安装就完成了。

创建Topic

Kafka是消息订阅系统,首先创建可以被订阅的Topic,我们创建一个名为flink-tipic的Topic,在一个新的terminal中,执行如下命令:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipicCreated topic "flink-tipic".复制代码

在Kafka Server的terminal中也会输出如下成功创建信息:

...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...复制代码

上面显示了flink-topic的基本属性配置,如消息压缩方式,消息格式,备份数量等等。

除了看日志,我们可以用命令显示的查询我们是否成功的创建了flink-topic,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181flink-tipic复制代码

如果输出flink-tipic,那么说明我们的Topic成功创建了。

那么Topic是保存在哪里?Kafka是怎样进行消息的发布和订阅的呢?为直观,我们看如下Kafka架构示意图简单理解一下:

简单介绍一下,Kafka利用ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka Server 实例,Kafka Server叫做Broker,我们创建的Topic可以在一个或多个Broker中。Kafka利用Push模式发送消息,利用Pull方式拉取消息。

发送消息

如何向已经存在的Topic中发送消息呢,当然我们可以API的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg 
>Kafka connector复制代码

上面我们发送了两条消息Kafka test msgKafka connectorflink-topic Topic中。

读取消息

如果读取指定Topic的消息呢?同样可以API和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector复制代码

其中--from-beginning 描述了我们从Topic开始位置读取消息。

Flink Kafka Connector

前面我们以最简单的方式安装了Kafka环境,那么我们以上面的环境介绍Flink Kafka Connector的使用。Flink Connector相关的基础知识会在《Apache Flink 漫谈系列(14) - Connectors》中介绍,这里我们直接介绍与Kafka Connector相关的内容。

Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例进行介绍。

mvn 依赖

要使用Kakfa Connector需要在我们的pom中增加对Kafka Connector的依赖,如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.7.0</version>
</dependency>复制代码

Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。 DeserializationSchema允许用户指定这样的模式。 为每个Kafka消息调用 T deserialize(byte [] message)方法,从Kafka传递值。

Examples

我们示例读取Kafka的数据,再将数据做简单处理之后写入到Kafka中。我们需要再创建一个用于写入的Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output复制代码

所以示例中我们Source利用flink-topic, Sink用slink-topic-output

Simple ETL

我们假设Kafka中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serializedeserialize的实现,也就是我们要定义一个实现DeserializationSchemaSerializationSchema 的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写Flink主程序。

  • KafkaMsgSchema - 完整代码

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {private static final long serialVersionUID = 1L;private transient Charset charset;public KafkaMsgSchema() {// 默认UTF-8编码this(Charset.forName("UTF-8"));}public KafkaMsgSchema(Charset charset) {this.charset = Preconditions.checkNotNull(charset);}public Charset getCharset() {return this.charset;}public String deserialize(byte[] message) {// 将Kafka的消息反序列化为java对象return new String(message, charset);}public boolean isEndOfStream(String nextElement) {// 流永远不结束return false;}public byte[] serialize(String element) {// 将java对象序列化为Kafka的消息return element.getBytes(this.charset);}public TypeInformation<String> getProducedType() {// 定义产生的数据Typeinforeturn BasicTypeInfo.STRING_TYPE_INFO;}private void writeObject(ObjectOutputStream out) throws IOException {out.defaultWriteObject();out.writeUTF(this.charset.name());}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();String charsetName = in.readUTF();this.charset = Charset.forName(charsetName);}
}
复制代码
  • 主程序 - 完整代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;public class KafkaExample {public static void main(String[] args) throws Exception {// 用户参数获取final ParameterTool parameterTool = ParameterTool.fromArgs(args);// Stream 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Source的topicString sourceTopic = "flink-topic";// Sink的topicString sinkTopic = "flink-topic-output";// broker 地址String broker = "localhost:9092";// 属性参数 - 实际投产可以在命令行传入Properties p = parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers", broker);env.getConfig().setGlobalJobParameters(parameterTool);// 创建消费者FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(sourceTopic,new KafkaMsgSchema(),p);// 设置读取最早的数据
//        consumer.setStartFromEarliest();// 读取Kafka消息DataStream<String> input = env.addSource(consumer);// 数据处理DataStream<String> result = input.map(new MapFunction<String, String>() {public String map(String s) throws Exception {String msg = "Flink study ".concat(s);System.out.println(msg);return msg;}});// 创建生产者FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(sinkTopic,new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);// 将数据写入Kafka指定Topic中result.addSink(producer);// 执行jobenv.execute("Kafka Example");}
}
复制代码

运行主程序如下:


我测试操作的过程如下:

  1. 启动flink-topicflink-topic-output的消费拉取;

  2. 通过命令向flink-topic中添加测试消息only for test;

  3. 通过命令打印验证添加的测试消息 only for test;

  4. 最简单的FlinkJob source->map->sink 对测试消息进行map处理:"Flink study ".concat(s);

  5. 通过命令打印sink的数据;

#### 内置Schemas
Apache Flink 内部提供了如下3种内置的常用消息格式的Schemas:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation创建模式。 如果数据由Flink写入和读取,这将非常有用。

  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) 它将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”)作为(Int / String / ...)()从中访问字段。 KeyValue objectNode包含“key”和“value”字段,其中包含所有字段以及可选的"metadata"字段,该字段公开此消息的偏移量/分区/主题。

  • AvroDeserializationSchema 它使用静态提供的模式读取使用Avro格式序列化的数据。 它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(...))推断出模式,或者它可以与GenericRecords一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(...))

要使用内置的Schemas需要添加如下依赖:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>1.7.0</version>
</dependency>复制代码

读取位置配置

我们在消费Kafka数据时候,可能需要指定消费的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置设置,如下:

  • consumer.setStartFromEarliest() - 从最早的记录开始;

  • consumer.setStartFromLatest() - 从最新记录开始;

  • consumer.setStartFromTimestamp(...); // 从指定的epoch时间戳(毫秒)开始;

  • consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

上面的位置指定可以精确到每个分区,比如如下代码:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始consumer.setStartFromSpecificOffsets(specificStartOffsets);复制代码

对于没有指定的分区还是默认的setStartFromGroupOffsets方式。

Topic发现

Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer,比如:

// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);复制代码

在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以sourceTopic的值开头并以单个数字结尾)。

定义Watermark(Window)

对Kafka Connector的应用不仅限于上面的简单数据提取,我们更多时候是期望对Kafka数据进行Event-time的窗口操作,那么就需要在Flink Kafka Source中定义Watermark。

要定义Event-time,首先是Kafka数据里面携带时间属性,假设我们数据是String#Long的格式,如only for test#1000。那么我们将Long作为时间列。

  • KafkaWithTsMsgSchema - 完整代码
    要想解析上面的Kafka的数据格式,我们需要开发一个自定义的Schema,比如叫KafkaWithTsMsgSchema,将String#Long解析为一个Java的Tuple2<String, Long>,完整代码如下:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {private static final long serialVersionUID = 1L;private transient Charset charset;public KafkaWithTsMsgSchema() {this(Charset.forName("UTF-8"));}public KafkaWithTsMsgSchema(Charset charset) {this.charset = Preconditions.checkNotNull(charset);}public Charset getCharset() {return this.charset;}public Tuple2<String, Long> deserialize(byte[] message) {String msg = new String(message, charset);String[] dataAndTs = msg.split("#");if(dataAndTs.length == 2){return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));}else{// 实际生产上需要抛出runtime异常System.out.println("Fail due to invalid msg format.. ["+msg+"]");return new Tuple2<String, Long>(msg, 0L);}}@Overridepublic boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {return false;}public byte[] serialize(Tuple2<String, Long> element) {return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);}private void writeObject(ObjectOutputStream out) throws IOException {out.defaultWriteObject();out.writeUTF(this.charset.name());}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();String charsetName = in.readUTF();this.charset = Charset.forName(charsetName);}@Overridepublic TypeInformation<Tuple2<String, Long>> getProducedType() {return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);}
}
复制代码
  • Watermark生成

提取时间戳和创建Watermark,需要实现一个自定义的时间提取和Watermark生成器。在Apache Flink 内部有2种方式如下:

  • AssignerWithPunctuatedWatermarks - 每条记录都产生Watermark。

  • AssignerWithPeriodicWatermarks - 周期性的生成Watermark。

    我们以AssignerWithPunctuatedWatermarks为例写一个自定义的时间提取和Watermark生成器。代码如下:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;import javax.annotation.Nullable;public class KafkaAssignerWithPunctuatedWatermarksimplements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {// 利用提取的时间戳创建Watermarkreturn new Watermark(l);}@Overridepublic long extractTimestamp(Tuple2<String, Long> o, long l) {// 提取时间戳return o.f1;}
}复制代码
  • 主程序 - 完整程序
    我们计算一个大小为1秒的Tumble窗口,计算窗口内最大的值。完整的程序如下:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;public class KafkaWithEventTimeExample {public static void main(String[] args) throws Exception {// 用户参数获取final ParameterTool parameterTool = ParameterTool.fromArgs(args);// Stream 环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 Event-timeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// Source的topicString sourceTopic = "flink-topic";// Sink的topicString sinkTopic = "flink-topic-output";// broker 地址String broker = "localhost:9092";// 属性参数 - 实际投产可以在命令行传入Properties p = parameterTool.getProperties();p.putAll(parameterTool.getProperties());p.put("bootstrap.servers", broker);env.getConfig().setGlobalJobParameters(parameterTool);// 创建消费者FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(sourceTopic,new KafkaWithTsMsgSchema(),p);// 读取Kafka消息TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);DataStream<Tuple2<String, Long>> input = env.addSource(consumer).returns(typeInformation)// 提取时间戳,并生产Watermark.assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());// 数据处理DataStream<Tuple2<String, Long>> result = input.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).max(0);// 创建生产者FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(sinkTopic,new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),p,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);// 将数据写入Kafka指定Topic中result.addSink(producer);// 执行jobenv.execute("Kafka With Event-time Example");}
}复制代码

测试运行如下

简单解释一下,我们输入数如下:

MsgWatermark
E#10000001000000
A#30000003000000
B#50000005000000
C#50001005000100
E#50001205000120
A#70000007000000

我们看的5000000~7000000之间的数据,其中B#5000000, C#5000100E#5000120是同一个窗口的内容。计算MAX值,按字符串比较,最大的消息就是输出的E#5000120

Kafka携带Timestamps

在Kafka-0.10+ 消息可以携带timestamps,也就是说不用单独的在msg中显示添加一个数据列作为timestamps。只有在写入和读取都用Flink时候简单一些。一般情况用上面的示例方式已经足够了。

小结

本篇重点是向大家介绍Kafka如何在Flink中进行应用,开篇介绍了Kafka的简单安装和收发消息的命令演示,然后以一个简单的数据提取和一个Event-time的窗口示例让大家直观的感受如何在Apache Flink中使用Kafka。


你可能感兴趣的文章

  • Flink入门

  • Flink DataSet&DataSteam API

  • Flink集群部署

  • Flink重启策略

  • Flink分布式缓存

  • Flink重启策略

  • Flink中的Time

  • Flink中的窗口

  • Flink的时间戳和水印

  • Flink广播变量

  • Flink-Kafka-connetor

  • Flink-Table&SQL

  • Flink实战项目-热销排行

  • Flink-Redis-Sink

  • Flink消费Kafka写入Mysql

后面会继续更新更多实战案例...


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/536644.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java使用继承的语法是,Java基础语法八 继承

1、超类和子类超类和子类父类与子类多态&#xff1a;一个对象变量可以指示多种实际类型的现象称为多态一个变量可以引用父类对象&#xff0c;也可以引用其子类对象&#xff0c;这就是多态。不能将一个超类的引用赋给子类变量&#xff0c;因为调用子类方法时可能发生运行错误子类…

kaka 1.0.0 重磅发布,服务于后端的事件领域模型框架。

百度智能云 云生态狂欢季 热门云产品1折起>>> kaka 1.0.0正式发布了&#xff0c;从三个月前的kaka-notice-lib 1.0.0的发布&#xff0c;经过多次研磨&#xff0c;终于迎来了本次重大更新。 kaka是一款服务于java后端的事件领域模型框架&#xff0c;主要目的为解耦业…

java配置文件工具类,java项目加载配置文件的工具类

java项目加载配置文件的工具类package com.loadproperties;import java.io.IOException;import java.io.InputStream;import java.util.Properties;public class ConfigUtil {private static InputStream input;private volatile Properties configuration new Properties();/…

如何把WAV格式音频转换为MP3格式

WAV为微软公司&#xff08;Microsoft)开发的一种声音文件格式&#xff0c;它符合RIFF(Resource Interchange File Format)文件规范&#xff0c;用于保存Windows平台的音频信息资源&#xff0c;被Windows平台及其应用程序所广泛支持&#xff0c;因此在声音文件质量和CD相差无几&…

php 异步处理类,php异步处理类

该类可以请求HTTP和HTTPS协议&#xff0c;还可以处理301、302重定向以及GZIP压缩等。[PHP]代码//使用方法require(asynHandle.class.php);$obj new asynHandle();$result $obj->Request(http://baidu.com);$result2 $obj->Get(https://mail.google.com/);echo "{…

恶意软件盯上了加密货币,两家以色列公司受到攻击

近日&#xff0c;网络安全公司Palo Alto Networks威胁研究部门Unit 42发博称&#xff0c;已确认Cardinal RAT自2017年4月起对两家从事外汇和加密交易软件开发的以色列金融科技公司发起过攻击。 Cardinal RAT是可远程访问特洛伊木马&#xff08;RAT&#xff09;&#xff0c;攻击…

php 自定义打印模板下载,PHP – 创建自定义模板系统?

我已经在这里搜索过,令人惊讶的是我找不到答案.我发现了一个类似的线程,但没有真正的解决方案.复杂的部分是循环,如果我不需要循环我可以只是做一个常规替换.所以,我有一个带有一些标记的.html文件,如下所示&#xff1a;{{startloop}}{{imgname}}{{endLoop}}我想要做的是用其他…

腾讯财报中“最大秘密”:2018云收入91亿元,交首份TO B答卷

腾讯财报中“最大秘密”云业务收入又一次被公开了&#xff1a;2018年&#xff0c;腾讯云收入91亿元&#xff0c;增长100%。 3月21日&#xff0c;腾讯发布2018年Q4及全年财报&#xff0c;2018全年收入3126.94亿元同比增长32%&#xff0c;净利润(Non-GAAP)774.69亿元。而被列进“…

根据坐标如何在matlab中l连成曲线,matlab中,如何将两条曲线画在一个坐标系里,plot(x1,x2,y1,y2)还是怎样...

matlab中&#xff0c;如何将两条曲线画在一个坐标系里&#xff0c;plot(x1,x2,y1,y2)还是怎样以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;matlab中&#xff0c;如何将两条曲线画在一个坐…

Android 物联网 传感器

前几天做了一个嵌入式课设。将传感器收集到的数据传到手机制作的APP里。 项目中涉及到的主要的java代码和xml布局文件上传到了github&#xff0c;https://github.com/123JACK123jack/Android转载于:https://www.cnblogs.com/libin123/p/10578601.html

java已被弱化签名,高效Java第四十条建议:谨慎设计方法签名

作用有助于设计易于学习和使用的API。如何做——谨慎地选择方法的名称1.选择易于理解的&#xff0c;并且与同一个包中的其他名称风格一致的名称。2.选择与大众认可的名称相一致的名称。如何做——不要过于追求提供便利的方法每个方法都应该尽其所能。方法太多会使类难以学习、使…

curl有php内存缓存,PHP CURL内存泄露的解决方法

PHP CURL内存泄露的解决方法curl配置平淡无奇&#xff0c;长时间运行发现一个严重问题&#xff0c;内存泄露&#xff01;不论用单线程和多线程都无法避免&#xff01;是curl访问https站点的时候有bug&#xff01;内存泄露可以通过linux的top命令发现&#xff0c;使用php函数mem…

MySQL学习【第五篇SQL语句上】

一.mysql命令 1.连接服务端命令 1.mysql -uroot -p123 -h127.0.0.1 2.mysql -uroot -p123 -S /tmp/mysql.sock 3.mysql -uroot -p123 -hlocalhost 4.mysql -uroot -p123 2.mysql登陆后的一些命令 1.\h或者help   查看帮助 2.\G       格式化查看数据&#xff08;以k…

phpexcel.php linux,phpexcel在linux系统报错如何解决

最近有个tp3.2的项目迁移到linux系统上了&#xff0c;突然有天发现原本在win server 2008上运行没问题的excel导出功能在新的系统上不能使用了。报错如下&#xff1a;说是1762行有问题&#xff0c;找到这个文件的代码看看&#xff1a;/*** Get an instance of this class** acc…

优雅的redux异步中间件 redux-effect

不吹不黑&#xff0c;redux蛮好用。只是有时略显繁琐&#xff0c;叫我定义每一个action、action type、使用时还要在组件上绑定一遍&#xff0c;臣妾做不到呀&#xff01;下面分享一种个人比较倾向的极简写法&#xff0c;仍有待完善&#xff0c;望讨论。 github: github.com/li…

oracle 中累加函数,Oracle 分析函数分组累加!

用户号码 登陆时间13000000002010-01-0113000000012010-01-0113000000022010-01-0213000000012010-01-0213000000032010-01-0313000000022010-01-0313000000042010-01-0413000000032010-01-0413000000042010-01-0213000000062011-01-0413000000012011-01-04剔除重复登陆的用户,…

asp.net core系列 48 Identity 身份模型自定义

一.概述 ASP.NET Core Identity提供了一个框架&#xff0c;用于管理和存储在 ASP.NET Core 应用中的用户帐户。 Identity添加到项目时单个用户帐户选择作为身份验证机制。 默认情况下&#xff0c;Identity可以使用的 Entity Framework (EF) Core 数据模型。 本文介绍如何自定义…

oracle中创建游标,oracle 存储过程创建游标

Oracle与Sql Server差异点详解1、create函数或存储过程异同点Oracle 创建函数或存储过程一般是 create or replace ……SQL SERVER 则是在创建之前加一条语句&#xff0c;先判断是否已经存在&#xff0c;如果存在删除已有的函数或存储过程。函数语句&#xff1a;if exists (sel…

hosts文件不起作用

突然发现电脑的hosts文件不起作用了。之前用的狠正常&#xff0c;近期也没有修改过。首先排除什么格式、DNS、注册表之类的问题。最终解决办法&#xff08;权限问题&#xff1a;有问题的hosts文件图标上有个锁&#xff09;&#xff1a;1.C:\Windows\System32\drivers\etc下复制…

oracle面临的挑战,未来数据库管理员面临的三大挑战

原标题&#xff1a;未来数据库管理员面临的三大挑战前言今天的数据库管理员面临着三大挑战&#xff1a;工作重心向以应用程序为中心转移、支持多个数据库平台的需求、在云端以及在本地管理数据库性能的责任不断扩大。为了在今天和未来都能站稳脚跟&#xff0c;数据库管理员需要…