react回调
我可以更好地了解a的用途的一种方式
基于React流的方法是它简化了无阻塞IO调用的方式。
这篇文章将快速讲解进行同步远程调用所涉及的那种代码,然后说明如何在非阻塞IO中分层,尽管在资源(尤其是线程)的使用方面非常高效,但会带来复杂性,称为回调地狱和基于React流的方法如何简化编程模型。
目标服务
由于我将编写一个客户呼叫,因此代表城市详细信息的目标服务有两个端点。 当使用uri类型的“ / cityids”调用时,返回一个城市ID的列表,结果示例如下:
[ 1 , 2 , 3 , 4 , 5 , 6 , 7 ]
端点返回给定城市ID的城市的详细信息,例如,当使用ID 1 –“ / cities / 1”进行调用时:
{ "country" : "USA" , "id" : 1 , "name" : "Portland" , "pop" : 1600000 }
客户的责任是获取城市ID的列表,然后为每个城市ID获取城市的详细信息并将其放到城市列表中。
同步通话
我正在使用Spring Framework的RestTemplate进行远程调用。 Kotlin函数获取城市ID列表如下所示:
private fun getCityIds(): List<String> { val cityIdsEntity: ResponseEntity<List<String>> = restTemplate .exchange( " http://localhost: $localServerPort/cityids" , HttpMethod.GET, null , object : ParameterizedTypeReference<List<String>>() {}) return cityIdsEntity.body!! }
并获取城市的详细信息:
private fun getCityForId(id: String): City { return restTemplate.getForObject( " http://localhost: $localServerPort/cities/$id" , City:: class .java)!! }
给定这两个功能,可以很容易地将它们组合起来,以便返回城市列表:
val cityIds: List<String> = getCityIds() val cities: List<City> = cityIds .stream() .map<City> { cityId -> getCityForId(cityId) } .collect(Collectors.toList()) cities.forEach { city -> LOGGER.info(city.toString()) }
该代码非常易于理解,但是涉及8个阻塞调用–
1.获取7个城市ID的列表,然后获取每个城市的详细信息 2.获取七个城市中每个城市的详细信息
这些调用中的每一个都会位于不同的线程上。
结合使用非阻塞IO和回调
我将使用一个名为AsyncHttpClient的库进行非阻塞IO调用。
进行远程调用时,AyncHttpClient返回一个ListenableFuture类型。
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute()
可以将回调附加到可监听的将来,以便在可用时对响应进行操作。
responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) .... }
给定城市ID列表,我想获取城市的详细信息,因此从响应中,我需要进行更多的远程调用,并为每个调用附加一个回调,以沿以下方向获取城市的详细信息:
val responseListenableFuture: ListenableFuture<Response> = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute() responseListenableFuture.addListener(Runnable { val response: Response = responseListenableFuture.get() val responseBody: String = response.responseBody val cityIds: List<Long> = objectMapper.readValue<List<Long>>(responseBody, object : TypeReference<List<Long>>() {}) cityIds.stream().map { cityId -> val cityListenableFuture = asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cities/$cityId" ) .execute() cityListenableFuture.addListener(Runnable { val cityDescResp = cityListenableFuture.get() val cityDesc = cityDescResp.responseBody val city = objectMapper.readValue(cityDesc, City:: class .java) LOGGER.info( "Got city: $city" ) }, executor) }.collect(Collectors.toList()) }, executor)
这是一段粗糙的代码,在一个回调中有一组回调,很难对此进行推理和理解,因此被称为回调地狱。
在Java CompletableFuture中使用非阻塞IO
通过返回Java的CompletableFuture作为返回类型而不是ListenableFuture,可以对代码进行一些改进。 CompletableFuture提供允许修改返回的返回类型的运算符。
例如,考虑使用函数获取城市ID列表:
private fun getCityIds(): CompletableFuture<List<Long>> { return asyncHttpClient .prepareGet( " http://localhost: $localServerPort/cityids" ) .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody val l: List<Long> = objectMapper.readValue(s, object : TypeReference<List<Long>>() {}) l } }
在这里,我使用“ thenApply”运算符将“ CompletableFuture <Response>”转换为“ CompletableFuture <List <Long >>”
并类似地获得城市的详细信息:
private fun getCityDetail(cityId: Long): CompletableFuture<City> { return asyncHttpClient.prepareGet( " http://localhost: $localServerPort/cities/$cityId" ) .execute() .toCompletableFuture() .thenApply { response -> val s = response.responseBody LOGGER.info( "Got {}" , s) val city = objectMapper.readValue(s, City:: class .java) city } }
这是基于回调方法的改进,但是,CompletableFuture缺少足够的运算符,例如,在这种特定情况下,需要将所有城市详细信息放在一起:
val cityIdsFuture: CompletableFuture<List<Long>> = getCityIds() val citiesCompletableFuture: CompletableFuture<List<City>> = cityIdsFuture .thenCompose { l -> val citiesCompletable: List<CompletableFuture<City>> = l.stream() .map { cityId -> getCityDetail(cityId) }.collect(toList()) val citiesCompletableFutureOfList: CompletableFuture<List<City>> = CompletableFuture.allOf(*citiesCompletable.toTypedArray()) .thenApply { _: Void? -> citiesCompletable .stream() .map { it.join() } .collect(toList()) } citiesCompletableFutureOfList }
我使用了一个名为CompletableFuture.allOf的运算符,该运算符返回“ Void”类型,并且必须强制返回所需的“” CompletableFuture <List <City >>类型。
使用Project Reactor
Project Reactor是Reactive Streams规范的实现。 它有两种特殊类型,可返回0/1项目流和0 / n项目流–前者是Mono,后者是Flux。
Project Reactor提供了一组非常丰富的运算符,这些运算符允许以多种方式转换数据流。 首先考虑该函数以返回城市ID列表:
private fun getCityIds(): Flux<Long> { return webClient.get() .uri( "/cityids" ) .exchange() .flatMapMany { response -> LOGGER.info( "Received cities.." ) response.bodyToFlux<Long>() } }
我正在使用Spring出色的WebClient库进行远程调用,并获得ProjectReact器“ Mono <ClientResponse>”类型的响应,可以使用“ flatMapMany”运算符将其修改为“ Flux <Long>”类型。
在给定城市ID的情况下,按照相同的步骤获取城市的详细信息:
private fun getCityDetail(cityId: Long?): Mono<City> { return webClient.get() .uri( "/cities/{id}" , cityId!!) .exchange() .flatMap { response -> val city: Mono<City> = response.bodyToMono() LOGGER.info( "Received city.." ) city } }
在这里,使用“ flatMap”运算符将项目React堆“ Mono <ClientResponse>”类型转换为“ Mono <City>”类型。
以及从中获取城市ID和城市的代码:
val cityIdsFlux: Flux<Long> = getCityIds() val citiesFlux: Flux<City> = cityIdsFlux .flatMap { this .getCityDetail(it) } return citiesFlux
这非常具有表现力-对比了基于回调的方法的混乱和基于响应流的方法的简单性。
结论
在我看来,这是使用基于响应流的方法的最大原因之一,尤其是在涉及跨越异步边界的场景(例如在这种情况下进行远程调用)的情况下,尤其是Project Reactor。 它清除了各种回调和回调地狱,并提供了使用一组丰富的运算符修改/转换类型的自然方法。
我在这里使用的所有示例的工作版本的存储库位于https://github.com/bijukunjummen/reactive-cities-demo/tree/master/src/test/kotlin/samples/geo/kotlin
翻译自: https://www.javacodegeeks.com/2019/06/callback-hell-reactive-patterns.html
react回调