网站与维护wordpress 删除超文本
网站与维护,wordpress 删除超文本,广州网络推广服务商,wordpress免费主题推荐rxjava 被观察者大约4年前#xff0c;我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions #xff0c;但是直到我几周前看到Matthew在Code Mesh上发表演讲后#xff0c;我才对它有所了解。 最近它似乎越来越流行#xff0c;我注意到Netflix编写了一个Java版本… rxjava 被观察者 大约4年前我第一次在Matthew Podwysocki的博客上遇到了Reactive Extensions 但是直到我几周前看到Matthew在Code Mesh上发表演讲后我才对它有所了解。 最近它似乎越来越流行我注意到Netflix编写了一个Java版本RxJava 。 我以为可以尝试通过更改在探索cypher的MERGE函数时暴露的Observable而不是Future的代码来尝试一下。 回顾一下我们有50个线程我们进行了100次迭代在这些迭代中我们创建了随机用户事件对。 我们最多创建10个用户和50个事件目标是同时发送相同对的请求。 在另一篇文章的示例中我丢弃了每个查询的结果而在这里我返回了结果因此我有一些要订阅的内容。 代码的轮廓如下所示 public class MergeTimeRx
{public static void main( final String[] args ) throws InterruptedException, IOException{String pathToDb /tmp/foo;FileUtils.deleteRecursively( new File( pathToDb ) );GraphDatabaseService db new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );final ExecutionEngine engine new ExecutionEngine( db );int numberOfThreads 50;int numberOfUsers 10;int numberOfEvents 50;int iterations 100;ObservableExecutionResult events processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );events.subscribe( new Action1ExecutionResult(){Overridepublic void call( ExecutionResult result ){for ( MapString, Object row : result ){}}} );....}} 使用RxJava的好处是没有提到我们如何获取ExecutionResult的集合这并不重要。 我们只有它们的流并且通过在Observable上调用订阅函数只要有另一个函数可用我们就会得到通知。 我发现的大多数示例都显示了如何从单个线程生成事件但是我想使用线程池以便可以同时触发许多请求。 processEvents方法最终看起来像这样 private static ObservableExecutionResult processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ){final Random random new Random();final ListInteger userIds generateIds( numberOfUsers );final ListInteger eventIds generateIds( numberOfEvents );return Observable.create( new Observable.OnSubscribeFuncExecutionResult(){Overridepublic Subscription onSubscribe( final Observer? super ExecutionResult observer ){final ExecutorService executor Executors.newFixedThreadPool( numberOfThreads );ListFutureExecutionResult jobs new ArrayList();for ( int i 0; i iterations; i ){FutureExecutionResult job executor.submit( new CallableExecutionResult(){Overridepublic ExecutionResult call(){Integer userId userIds.get( random.nextInt( numberOfUsers ) );Integer eventId eventIds.get( random.nextInt( numberOfEvents ) );return engine.execute(MERGE (u:User {id: {userId}})\n MERGE (e:Event {id: {eventId}})\n MERGE (u)-[:HAS_EVENT]-(e)\n RETURN u, e,MapUtil.map( userId, userId, eventId, eventId ) );}} );jobs.add( job );}for ( FutureExecutionResult future : jobs ){try{observer.onNext( future.get() );}catch ( InterruptedException | ExecutionException ignored ){}}observer.onCompleted();executor.shutdown();return Subscriptions.empty();}} );} 我不确定这是否是使用Observable的正确方法因此如果我记错了请在评论中让我知道。 我不确定处理错误的正确方法是什么。 我最初在catch块中调用了observeronError 但这意味着不会再产生不是我想要的事件。 如果您想使用它该代码可以作为要点 。 我添加了以下依赖关系以获取RxJava库 dependencygroupIdcom.netflix.rxjava/groupIdartifactIdrxjava-core/artifactIdversion0.15.1/version/dependency 参考 RxJava 从未来到我们的JCG合作伙伴 Mark Needham在Mark Needham Blog博客上均可观察到。 翻译自: https://www.javacodegeeks.com/2014/01/rxjava-from-future-to-observable.htmlrxjava 被观察者
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/91874.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!