1.chronicle queue介绍
Chronicle Queue使用一个内存映射文件来持久化每一条消息。这使我们能够在进程之间共享消息。它直接将数据存储到堆外内存,因此,使其没有GC开销。它被设计用来为高性能应用程序提供低延迟的消息框架。使用开源的Chronicle Queue可以创建巨大的持久队列,同时保持可预测和一致的低延迟。
Chronicle Queue有三个概念的特点。
Excerpt – is a data container
Appender – appender is used for writing data
Trailer – is used for sequentially reading data
我们将使用Chronicle接口为读写操作保留这部分内存。
创建queue
File queueDir = Files.createTempDirectory("chronicle-queue").toFile();
Chronicle chronicle = ChronicleQueueBuilder.indexed(queueDir).build();
我们将需要一个基础目录,队列将在内存映射的文件中持久保存记录。
ChronicleQueueBuilder类提供不同类型的队列。在这种情况下,我们使用了IndexedChronicleQueue,它使用顺序索引来维护队列中记录的内存偏移。
向队列写东西。
为了将项目写入队列,我们需要使用Chronicle实例创建一个ExcerptAppender类的对象。下面是向队列中写入消息的示例代码。
下面是向队列中写入消息的示例代码。
ExcerptAppender appender = chronicle.createAppender();
appender.startExcerpt();String stringVal = "Hello World";
int intVal = 101;
long longVal = System.currentTimeMillis();
double doubleVal = 90.00192091d;appender.writeUTF(stringValue);
appender.writeInt(intValue);
appender.writeLong(longValue);
appender.writeDouble(doubleValue);
appender.finish();
创建appender后,我们将使用startExcerpt方法启动appender。它以默认的消息容量128K启动一个Excerpt。我们可以使用startExcerpt的重载版本来提供一个自定义容量。
一旦启动,我们可以使用库中提供的各种写法将任何文字或对象值写到队列中。
最后,当我们写完后,我们将完成摘录,把数据保存到队列中,以后再保存到光盘中。
从队列中读取信息
使用ExcerptTrailer实例可以轻松地从队列中读取值。
它就像我们在Java中用来遍历一个集合的迭代器。
让我们从队列中读取数值。
ExcerptTailer tailer = chronicle.createTailer();
while (tailer.nextIndex()) {tailer.readUTF();tailer.readInt();tailer.readLong();tailer.readDouble();
}
tailer.finish();
在创建完预告片后,我们使用nextIndex方法来检查是否有新的摘录可以阅读。一旦ExcerptTailer有一个新的Excerpt要读取,我们可以使用一系列的read方法来读取字面和对象类型值的信息。
2.代码工程
目标是创建一个持久、并发、低延迟、可从多个进程访问并且可以容纳数十亿个对象的队列
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>Chronicle-Queue</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>net.openhft</groupId><artifactId>chronicle-queue</artifactId><version>5.25ea12</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency></dependencies>
</project>
entity
package com.et.chronicle.queue.entity;import net.openhft.chronicle.wire.Marshallable;/*** 该类实现 `net.openhft.chronicle.wire.Marshallable` 并覆盖 `toString` 方法以实现更高效的序列化*/
public class Person implements Marshallable {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public Person() {super();}@Overridepublic String toString() {return Marshallable.$toString(this);}
}
代码仓库
https://github.com/Harries/springboot-demo
3.测试
测试文本
package com.et.chronicle.queue;import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;class TestQueueMain {ChronicleQueue queue;@BeforeEachvoid setUp() throws Exception {String basePath = OS.getTarget() + "/Queue1";queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();}@AfterEachvoid tearDown() throws Exception {queue.close();}/*** 测试最简单的写入字符串*/@Testvoid testWtite() {ExcerptAppender appender = queue.acquireAppender();try {for (int i = 0; i < 50000; i++) {appender.writeText("Hello World(hello world)!--" + i);}} finally {appender.close();}}/*** 测试最简单的读取字符串*/@Testvoid testRead() {ExcerptTailer tailer = queue.createTailer("reader1"); //@wjw_note: 如果是createTailer()方法,没有给定名称,会一直能读到最后的数据而不会移动索引try {String readText = null;while ((readText = tailer.readText()) != null) {System.out.println("read: " + readText);}} finally {tailer.close();}}}
测试对象
package com.et.chronicle.queue;import com.alibaba.fastjson.JSONObject;
import com.et.chronicle.queue.entity.Person;
import com.fasterxml.jackson.databind.util.JSONPObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;class TestQueueObject {ChronicleQueue queue;@BeforeEachvoid setUp() throws Exception {String basePath = OS.getTarget() + "/QueueDocument";queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();}@AfterEachvoid tearDown() throws Exception {queue.close();}/*** 测试读写实现了Marshallable接口的对象*/@Testvoid testMarshallable() {ExcerptAppender appender = queue.acquireAppender();try {for (int i = 0; i < 5; i++) {Person person = new Person();person.setName("Rob");person.setAge(40 + i);appender.writeDocument(person);}} finally {appender.close();}ExcerptTailer tailer = queue.createTailer("reader1");try {Person person2 = new Person();while (tailer.readDocument(person2)) {System.out.println(JSONObject.toJSON(person2));}} finally {appender.close();}}
}
4.引用
https://www.baeldung.com/java-chronicle-queue
http://www.liuhaihua.cn/archives/710334.html
https://chronicle.software/quick-start/?product=queue