react 消息队列
React性流是最近宣布的一项计划,旨在在JVM上为具有内置背压的异步流处理创建标准。 该工作组由Typesafe,Red Hat,Oracle,Netflix等公司组成。
早期的实验性实现之一是基于Akka的 。 预览版0.3包括演员生产者和消费者,这为集成提供了新的可能性。
 
为了测试新技术,我实现了一个非常简单的Reactive Message Queue 。 该代码处于PoC阶段,缺乏错误处理等功能,但如果使用正确,则可以正常工作!
队列是响应式的,这意味着消息将在需要时传递给感兴趣的各方,而无需轮询。 在发送消息时(以便发送者不会使代理不堪重负)和在接收消息时(以便代理仅发送与接收者可以使用的消息一样多的消息)都会施加反压。
让我们看看它是如何工作的!
队列
 首先,队列本身是一个参与者,对(React式)流一无所知。 该代码位于com.reactmq.queue包中。 actor接受以下actor消息(“ message”一词在此处已重载,因此我将使用普通的“ message”来表示我们发送到队列和从队列中接收的消息,而“ actor-messages”则为Scala)。类实例发送给演员): 
-  SendMessage(content)–发送具有指定String内容的消息。 回复(SentMessage(id))被发送回带有消息ID的发件人
-  ReceiveMessages(count)–表示发件人(演员)想接收最多邮件count信号。 该计数与先前发出信号的需求累加。
-  DeleteMessage(id)–毫不奇怪,删除一条消息
队列实现是ElasticMQ的简化版本。 收到消息后,如果在10秒钟内未将其删除(确认),则可以再次接收。
 当一个actor发出对消息的需求信号时(通过将ReceiveMessages发送到队列actor),它应该期望有任意数量的ReceivedMessages(msgs) actor-message答复,其中包含接收到的数据。 
变得被动
要创建和测试我们的React式队列,我们需要三个应用程序:
-  Sender
-  中央Broker
-  Receiver
 我们可以运行任何数量的Senders和Receivers ,但是当然我们应该只运行一个Broker 。 
 我们需要做的第一件事是通过网络将Sender与Broker连接,将Receiver与Broker连接。 我们可以使用Akka IO扩展和React式TCP扩展来做到这一点。 使用connect和bind对,我们在绑定端获得了一个连接流: 
// sender:
val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress)connectFuture.onSuccess {case binding: StreamTcp.OutgoingTcpConnection =>logger.info("Sender: connected to broker")// per-connection logic
}// broker:
val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress)bindSendFuture.onSuccess {case serverBinding: StreamTcp.TcpServerBinding =>logger.info("Broker: send bound")Flow(serverBinding.connectionStream).foreach { conn =>// per-connection logic}.consume(materializer)
}有一个用于发送和接收消息的地址。
寄件人
 首先让我们看一下Sender的每个连接逻辑。 
Flow(1.second, () => { idx += 1; s"Message $idx from $senderName" }).map { msg =>logger.debug(s"Sender: sending $msg")createFrame(msg)}.toProducer(materializer).produceTo(binding.outputStream) 我们正在创建一个滴答流,它每秒产生一个新消息(非常方便测试)。 使用map流转换器,我们用消息创建了一个字节帧(稍后会详细介绍)。 但这仅是我们(非常简单)流的外观的描述; 它需要使用物化 toProducer方法,该方法将提供流变换节点的具体实现。 当前只有一个FlowMaterializer ,这同样令人惊讶地使用引擎盖下的Akka actor来实际创建流和流。 
 最后,我们将刚刚创建的生产者连接到TCP绑定的outputStream ,而恰好是消费者。 现在,我们有了一个React性的网络上的消息流,这意味着仅当Broker可以接受消息时才发送消息。 否则,反压将一直施加到滴答声产生器。 
  
代理:发送消息
 在网络的另一端是Broker 。 让我们看看消息到达时会发生什么。 
Flow(serverBinding.connectionStream).foreach { conn =>logger.info(s"Broker: send client connected (${conn.remoteAddress})")val sendToQueueConsumer = ActorConsumer[String](system.actorOf(Props(new SendToQueueConsumer(queueActor))))// sending messages to the queue, receiving from the clientval reconcileFrames = new ReconcileFrames()Flow(conn.inputStream).mapConcat(reconcileFrames.apply).produceTo(materializer, sendToQueueConsumer)
}.consume(materializer) 首先,我们创建了一个Flow ,那将是字节输入流-从连接的输入流。 接下来,我们重新构造使用框架发送的String实例,最后将流定向到发送到队列的使用者。 
 SendToQueueConsumer是到主队列SendToQueueConsumer的每个连接的桥。 它使用Akka的Reactive Streams实施中的ActorConsumer特性来自动管理应该在上游发出信号的需求。 利用该特征,我们可以创建一个由演员支持的React流Consumer[_] ,从而实现完全可定制的接收器。 
class SendToQueueConsumer(queueActor: ActorRef) extends ActorConsumer {private var inFlight = 0override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {override def inFlightInternally = inFlight}override def receive = {case OnNext(msg: String) =>queueActor ! SendMessage(msg)inFlight += 1case SentMessage(_) => inFlight -= 1}
} 需要提供给ActorConsumer是一种测量当前正在处理的流项目的方法。 在这里,我们正在计算已发送到队列但尚未收到ID的消息数(因此,队列正在处理它们)。 
 消费者收到包装在OnNext actor消息中的新消息; 因此, OnNext由流发送给SentMessage ,而SentMessage被队列SentMessage发送以回复SendMessage 。 
接收
 接收部分以类似的方式完成,尽管它需要一些额外的步骤。 首先,如果您查看Receiver ,您将看到我们正在从输入流中读取字节,从帧中重构消息,并发回ID,从而确认消息。 实际上,我们将在接收消息和发送回ID之间运行一些消息处理逻辑。 
 在Broker方,我们为每个连接创建两个流。 
 一个是发送给接收者的消息流,另一个是来自接收者的已确认消息ID的流,这些流被简单地转换为将DeleteMessage消息发送给队列actor。 
 与使用者类似,我们需要从队列参与者到流的每个连接的接收桥。 这是在ReceiveFromQueueProducer实现的。 在这里,我们扩展了ActorProducer特性,它使您可以完全控制实际创建流中消息的过程。 
 在此参与者中,流正在发送Request参与者消息,以发出需求信号。 有需求时,我们从队列中请求消息。 队列最终将以一个或多个ReceivedMessages actor消息进行响应(当队列中有任何消息时); 由于消息的数量永远不会超出信号需求,因此我们可以安全地调用ActorProducer.onNext方法,该方法将给定的项目发送到下游。 
构图
 一个小细节是我们需要一个自定义的框架协议(感谢Roland Kuhn的澄清 ),因为TCP流只是一个字节流,因此我们可以获得数据的任意片段,以后需要重新组合。 幸运的是,实现这样的框架非常简单–请参阅Framing类。 每个帧都由消息的大小和消息本身组成。 
加起来
 使用React式流和Akka实施,可以轻松创建具有端到端背压的React式应用程序。 上面的队列虽然缺少很多功能和证明,但不允许Senders使Broker过载,而另一方面, Broker会使Receivers过载。 所有这些,而无需实际编写任何背压处理代码! 
翻译自: https://www.javacodegeeks.com/2014/06/reactive-queue-with-akka-reactive-streams.html
react 消息队列