使用Java 8处理并行数据库流

什么是并行数据库流?

阅读这篇文章,了解如何使用并行流和Speedment并行处理数据库中的数据。 在许多情况下,并行流可能比通常的顺序流快得多。

随着Java 8的引入,我们得到了期待已久的Stream库。 流的优点之一是使流并行非常容易。 基本上,我们可以采用任何流,然后只应用方法parallel()获得并行流,而不是顺序流。 默认情况下,并行流由公共ForkJoinPool执行。

平行

尖塔和公爵并行工作

因此,如果我们有工作量相对较高的工作项,那么并行流通常是有意义的。如果要在并行流管线中执行的工作项在很大程度上未耦合并且在几个线程相对较低。 同样,合并并行结果的努力也必须相对较低。

Speedment是开源的ORM Java工具包和RuntimeJava工具,它将现有的数据库及其表包装到Java 8流中。 我们可以使用现有的数据库并运行Speedment工具,它将生成与我们使用该工具选择的表相对应的POJO类。


Speedment的一项很酷的功能是,数据库流使用标准的Stream语义支持并行性。 这样, 顺序处理流相比,我们可以轻松地并行处理数据库内容并更快地产生结果!

加速入门

访问GitHub上的开放源Speedment ,了解如何开始Speedment项目。 将工具连接到现有数据库应该非常容易。

在本文中,下面的示例使用以下MySQL表。

CREATE TABLE `prime_candidate` (`id` int(11) NOT NULL AUTO_INCREMENT,`value` bigint(20) NOT NULL,`prime` bit(1) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB;

这个想法是人们可以在表中插入值,然后我们将编写一个应用程序来计算插入的值是否是质数。 在实际情况下,我们可以使用MySQL,PostgreSQL或MariaDB数据库中的任何表。

编写顺序流解决方案

首先,我们需要一个方法,如果值是素数则返回。 这是一种简单的方法。 请注意, 故意使算法变慢,因此我们可以清楚地了解并行流在昂贵的操作上的效果。

public class PrimeUtil {/*** Returns if the given parameter is a prime number.** @param n the given prime number candidate* @return if the given parameter is a prime number*/static boolean isPrime(long n) {// primes are equal or greater than 2 if (n < 2) {return false;}// check if n is evenif (n % 2 == 0) {// 2 is the only even prime// all other even n:s are notreturn n == 2;}// if odd, then just check the odds// up to the square root of n// for (int i = 3; i * i <= n; i += 2) {//// Make the methods purposely slow by// checking all the way up to nfor (int i = 3; i <= n; i += 2) {if (n % i == 0) {return false;}}return true;}}

同样,这篇文章的目的不是设计一种有效的质数确定方法。

使用这种简单的质数方法,我们现在可以轻松编写一个Speedment应用程序,该应用程序将扫描数据库表以查找未确定的质数候选者,然后将确定它们是否为质数并相应地更新表。 看起来可能是这样:

final JavapotApplication app = new JavapotApplicationBuilder().withPassword("javapot") // Replace with the real password.withLogging(LogType.STREAM).build();final Manager<PrimeCandidate> candidates = app.getOrThrow(PrimeCandidateManager.class);candidates.stream().filter(PrimeCandidate.PRIME.isNull())                      // Filter out undetermined primes.map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue())))   // Sets if it is a prime or not.forEach(candidates.updater());                             // Applies the Manager's updater

最后一部分包含有趣的内容。 首先,我们在“ prime”列为
使用stream().filter(PrimeCandidate.PRIME.isNull())方法stream().filter(PrimeCandidate.PRIME.isNull()) null 。 重要的是要了解,Speedment流实现将识别过滤谓词,并能够使用该谓词来减少从数据库中实际提取的候选者的数量(例如,“ SELECT * FROM FROM WHERE prime IS NULL”将使用)。

然后,对每个这样的总理候选人PC,我们无论是“黄金”列设置为true ,如果pc.getValue()是一个主要的或false ,如果pc.getValue()是不是一个素数。 有趣的是, pc.setPrime()方法返回实体pc本身,使我们能够轻松地标记多个流操作。 在最后一行,我们通过应用candidates.updater()函数update。 candidates.updater()将检查结果更新数据库。 因此,该应用程序的主要功能实际上是单行的(分为五行以提高可读性)。

现在,在测试应用程序之前,我们需要生成一些测试数据输入。 这是使用Speedment如何完成的示例:

final JavapotApplication app = new JavapotApplicationBuilder().withPassword("javapot") // Replace with the real password.build();final Manager<PrimeCandidate> candidates = app.getOrThrow(PrimeCandidateManager.class);final Random random = new SecureRandom();// Create a bunch of new prime candidatesrandom.longs(1_100, 0, Integer.MAX_VALUE).mapToObj(new PrimeCandidateImpl()::setValue)  // Sets the random value .forEach(candidates.persister());              // Applies the Manager's persister function

同样,我们只需几行代码就可以完成我们的任务。

尝试默认的并行流

如果要并行化流,则只需向以前的解决方案中添加一个方法即可:

candidates.stream().parallel()                                 // Now indicates a parallel stream.filter(PrimeCandidate.PRIME.isNull()).map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))).forEach(candidates.updater());             // Applies the Manager's updater

和我们平行! 但是,默认情况下,Speedment使用Java的默认并行化行为(在Spliterators::spliteratorUnknownSize定义),该行为针对非计算密集型操作进行了优化。 如果我们分析Java的默认并行化行为,我们将确定它将对第一个1024个工作项使用第一个线程,对随后的2 * 1024 = 2048个工作项使用第二个线程,然后对第三个线程使用3 * 1024 = 3072个工作项线程等。

这对我们的应用程序不利,因为每个应用程序的成本都很高。 如果我们正在计算1100个主要候选对象,我们将仅使用两个线程,因为第一个线程将处理前1024个项目,第二个线程将处理其余的76个项目。现代服务器的线程要多得多。 阅读下一节,了解如何解决此问题。

内置并行化策略

速度有许多内置的并行化策略,我们可以根据工作项的预期计算需求进行选择。 这是对仅具有一种默认策略的Java 8的改进。 内置的并行策略是:

@FunctionalInterface
public interface ParallelStrategy {/*** A Parallel Strategy that is Java's default <code>Iterator</code> to* <code>Spliterator</code> converter. It favors relatively large sets (in* the ten thousands or more) with low computational overhead.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityDefault() {...}/*** A Parallel Strategy that favors relatively small to medium sets with* medium computational overhead.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityMedium() {...}/*** A Parallel Strategy that favors relatively small to medium sets with high* computational overhead.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityHigh() {...}/*** A Parallel Strategy that favors small sets with extremely high* computational overhead. The set will be split up in solitary elements* that are executed separately in their own thread.** @return a ParallelStrategy*/static ParallelStrategy computeIntensityExtreme() {...}<T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics);static ParallelStrategy of(final int... batchSizes) {return new ParallelStrategy() {@Overridepublic <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {return ConfigurableIteratorSpliterator.of(iterator, characteristics, batchSizes);}};}

应用并行策略

我们要做的唯一一件事就是为这样的管理器配置并行化策略,我们很高兴:

Manager<PrimeCandidate> candidatesHigh = app.configure(PrimeCandidateManager.class).withParallelStrategy(ParallelStrategy.computeIntensityHigh()).build();candidatesHigh.stream() // Better parallel performance for our case!.parallel().filter(PrimeCandidate.PRIME.isNull()).map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))).forEach(candidatesHigh.updater());

ParallelStrategy.computeIntensityHigh()策略会将工作项分解成更小的块。 因为我们现在将使用所有可用的线程,所以这将使我们获得更好的性能。 如果我们深入研究,可以看到该策略的定义如下:

private final static int[] BATCH_SIZES = IntStream.range(0, 8).map(ComputeIntensityUtil::toThePowerOfTwo).flatMap(ComputeIntensityUtil::repeatOnHalfAvailableProcessors).toArray();

这意味着,在具有8个线程的计算机上,它将在线程1-4上放置一个项目,在线程5-8上放置两个项目,当任务完成时,接下来的四个可用线程上将有四个项目,然后是八个依此类推,直到达到256,这是任何线程上的最大项目数。 显然,对于该特定问题,此策略比Java的标准策略好得多。

这是常见的ForkJoinPool中的线程在我的8线程笔记本电脑上的样子:

共叉联合池

创建自己的并行策略

Speedment的一件很酷的事情是,我们可以很容易地编写并行化策略,然后将其注入流中。 考虑以下自定义并行化策略:

public static class MyParallelStrategy implements ParallelStrategy {private final static int[] BATCH_SIZES = {1, 2, 4, 8};@Overridepublic <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {return ConfigurableIteratorSpliterator.of(iterator, characteristics, BATCH_SIZES);}}

实际上,它可以表达得更短:

ParallelStrategy myParallelStrategy = ParallelStrategy.of(1, 2, 4, 8);

此策略将在第一个可用线程上放置一个工作项,在第二个可用线程上放置两个,在第三个线程上放置四个,在第四个线程上放置八个,其中八个是数组中的最后一位。 最后一位数字将用于所有后续可用线程。 因此,订单实际上变成了1、2、4、8、8、8、8...。现在,我们可以使用以下新策略:

Manager<PrimeCandidate> candidatesCustom = app.configure(PrimeCandidateManager.class).withParallelStrategy(myParallelStrategy).build();candidatesCustom.stream().parallel().filter(PrimeCandidate.PRIME.isNull()).map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))).forEach(candidatesCustom.updater());

瞧! 我们完全控制工作项在可用执行线程上的布局方式。

基准测试

所有基准都使用相同的主要候选者输入。 测试是在MacBook Pro,具有4个物理核心和8个线程的2.2 GHz Intel Core i7上进行的。

StrategySequential                       265 s (One thread processed all 1100 items)
Parallel Default Java 8          235 s (Because 1024 items were processed by thread 1 and 76 items by thread 2)
Parallel computeIntensityHigh()   69 s (All 4 hardware cores were used)

结论

Speedment支持并行处理数据库内容。 Speedment支持多种并行策略,以允许充分利用执行环境。

我们可以轻松地创建自己的并行策略,并在Speedment流中使用它们。 通过仔细选择一种并行策略而不是仅使用Java的默认策略,可以显着提高性能。

翻译自: https://www.javacodegeeks.com/2016/10/work-parallel-database-streams-using-java-8.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/351189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idf实验室--简单编程字符统计

idf实验室--简单编程字符统计&#xff0c;有需要的朋友可以参考下。 第一眼看这道题很简单&#xff0c;不就是字符统计么&#xff0c;可是题目要求2s内回答&#xff0c;而且每次打开的页面需要统计的字符串内容都会变&#xff0c;这就蛋疼了&#xff0c;于是乎上网学习下如何提…

微服务(一) --- 架构与选型

微服务架构的概述 应用架构的发展 应用是可以独立运行的程序代码,提供相对完善的业务功能. 目前的软件架构有三种架构类型: 业务架构应用架构技术架构他们之间的甚是: 业务架构决定应用架构,技术架构支撑着应用架构. 应用架构的发展历程: 单体架构: 最古老的单体应用,没有任何应…

linux常见致命错误(fatal error),解决办法:

常见错误一&#xff1a;fatal error: zlib.h: No such file or directory 解决办法&#xff1a; sudo apt-get install zlib1g-dev 常见错误二&#xff1a;fatal error: curses.h: No such file or directory #include <curses.h> 解决办法&#xff1a;sudo apt-get i…

brainfu*k语言执行

输入&#xff1a; brainfu*k 代码&#xff1a; [>>>><<<<-]>.>....>.<<.>..------.--------.>.>. 程序&#xff1a; #include <stdio.h> #include <stdlib.h> #include <string.h> #define MAXL 1000 /*…

Linux连接Windows服务器以及文件传输方法

最近&#xff0c;由于自身需要将操作系统换为Linux&#xff08;Ubuntu&#xff09;系统&#xff0c;但是由于实验室服务器为Windows系统&#xff0c;需要在Linux上连接Windows服务器&#xff0c;方法如下&#xff1a; 首先安装rdesktop 安装方法&#xff1a; sudo apt-get ins…

Filter过滤要登录的页面(重要)

一、为什么要写过滤器&#xff0c;过滤页面&#xff1f; 本人做了一个网站&#xff0c;目前还在开发。做过滤器的目的就是为了要过滤一些页面必需要用户登录之后才能看&#xff0c;主页什么的可以随便看&#xff0c;一旦涉及到要发布或评论什么信息&#xff0c;就必须要过滤用户…

mybatis crud_MyBatis教程– CRUD操作和映射关系–第1部分

mybatis crudCRUD操作 MyBatis是一个SQL Mapper工具&#xff0c;与直接使用JDBC相比&#xff0c;它极大地简化了数据库编程。 步骤1&#xff1a;创建一个Maven项目并配置MyBatis依赖项。 <project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/20…

C语言实现字符串匹配KMP算法

相信很多人&#xff08;包括自己&#xff09;初识KMP算法的时候始终是丈二和尚摸不着头脑&#xff0c;要么完全不知所云&#xff0c;要么看不懂书上的解释&#xff0c;要么自己觉得好像心里了解KMP算法的意思&#xff0c;却说不出个究竟&#xff0c;所谓知其然不知其所以然是也…

Linux问题分析或解决_samba无法连接

1. windows设置方面问题 问题&#xff1a;window能连接部分服务器的samba共享&#xff0c;一部分无法连接。报错如截图。 解决&#xff1a;前提---其他人连接都没有问题&#xff0c;发现有问题的连接服务器的电脑是win10&#xff0c;而win10可以连接到的服务器系统比较新&#…

Drools 7支持DMN(决策模型和表示法)

决策模型和表示法&#xff08;DMN&#xff09;规范是OMG&#xff08;对象管理小组&#xff09;提出的相对较新的标准&#xff0c;旨在为业务规则和业务决策做些什么。 BPMN&#xff08;它的兄弟规范&#xff09;用于业务流程&#xff1a;标准化表示法和执行语义&#xff0c;以…

Ubuntu系统下Python虚拟环境构建详解

在编程开发中&#xff0c;我们经常会利用不同版本的协助软件包&#xff0c;这样就导致一些软件不能兼容&#xff0c;为了解决这个问题呢&#xff0c;我们在儿引进Python虚拟环境&#xff0c;我们安装好虚拟环境之后&#xff0c;进一步激活它&#xff0c;然后在虚拟环境中运行不…

字符串匹配算法

1. 朴素算法 朴素算法是最简单的字符串匹配算法&#xff0c;也是人们接触得最多的字符串匹配算法。 2. Rabin-Karp算法 一个时间复杂度为O(&#xff08;N-M1)*M)的字符串匹配算法&#xff0c;即Rabin-Karp算法。Rabin-Karp算法的预处理时间是O(m)&#xff0c; 匹配时间OO(&…

SolrCloud集群配置

前提&#xff1a; 1&#xff0c;已经做好zookeeper集群或伪集群配置. 2&#xff0c;已将solr部署到tomcat中 接下来&#xff0c;我们将zookeeper与tomcat进行关联 1 vim tomcat/bin/catalina.sh tomcat1的bin目录下catalina.sh文件在第二行加入 1 JAVA_OPTS"-Dbootstrap_c…

Ubuntu18.04 台式电脑插入耳机没有声音解决办法

最近换位ubnutu18.04后发现电脑戴耳机没有声音网上查了一下解决办法如下&#xff1a; 1、打开命令行&#xff0c;输入&#xff1a;sudo apt-get install pavucontrol 2、接着再命令行中输入: pavucontrol 在上面点击向右按钮&#xff0c;然后会出现configuration&#xff0…

第2章 网页基础知识

HTTP 基础术语HTTP 请求过程HTTP Headers 信息网页的组成网页的结构HTML节点树CSS 选择器爬虫的基本原理HTTP CookiesHTTP 代理转载于:https://www.cnblogs.com/pzk7788/p/10512338.html

适用于无服务器Java开发人员的AWS Lambda:它为您带来了什么?

无服务器计算如何帮助您的生产基础架构&#xff1f; 在过去的几年中&#xff0c;无服务器计算架构一直受到关注&#xff0c;因为它专注于应用程序的主要组件之一&#xff1a;服务器。 这种体系结构采用了不同的方法。 在下面的文章中&#xff0c;我们将解释无服务器的含义&am…

Python的sys.stdout、sys.stdin重定向

Python的sys.stdout、sys.stdin重定向 转自&#xff1a;http://www.cnblogs.com/turtle-fly/p/3280519.html 本文环境&#xff1a;Python 2.7 使用 print obj 而非 print(obj) 一些背景 sys.stdout 与 print 当我们在 Python 中打印对象调用 print obj 时候&#xff0c;事实上…

Win10+Ubuntu16.04/Ubuntu18.04双系统安装教程

最近因为开发需要安装Linux系统&#xff0c;因为安装好几次Ubuntu18.04失败&#xff0c;退而安装Ubuntu16.04 安装也失败好几次&#xff0c;在不断尝试下终于解决&#xff0c;后来思考一下觉得Ubuntu 16.04/18.04安装失败原因一致,先进行分享。 先把我遇到的问题给大家看看如下…

获取XML的文件信息

1 /**2 * 获取XML文件的信息3 */4 import java.io.IOException;5 import javax.xml.parsers.DocumentBuilder;6 import javax.xml.parsers.DocumentBuilderFactory;7 import javax.xml.parsers.ParserConfigurationException;8 import org.w3c.dom.Document;9 import org.w3c…

python 中的三元表达式(三目运算符)

python中的三目运算符不像其他语言 其他的一般都是 判定条件?为真时的结果:为假时的结果 如 result5>3?1:0 这个输出1&#xff0c;但没有什么意义&#xff0c;仅仅是一个例子。 而在python中的格式为 为真时的结果 if 判定条件 else 为假时的结果 还是上面的例子 1 if 5…