文章目录  模拟实现   启动结果如下:    
 
package  com. example. demo. demo ; import  com. example. demo. common.  Consumer ; 
import  com. example. demo. common.  MqException ; 
import  com. example. demo. mqclient.  Channel ; 
import  com. example. demo. mqclient.  Connection ; 
import  com. example. demo. mqclient.  ConnectionFactory ; 
import  com. example. demo. mqsever. core.  BasicProperties ; 
import  com. example. demo. mqsever. core.  ExchangeType ; import  java. io.  IOException ; 
public  class  DemoConsumer  { public  static  void  main ( String [ ]  args)  throws  IOException ,  MqException ,  InterruptedException  { System . out. println ( "启动消费者!" ) ; ConnectionFactory  factory =  new  ConnectionFactory ( ) ; factory. setHost ( "127.0.0.1" ) ; factory. setPort ( 9090 ) ; Connection  connection =  factory. newConnection ( ) ; Channel  channel =  connection. createChannel ( ) ; channel. exchangeDeclare ( "testExchange" ,  ExchangeType . DIRECT ,  true ,  false ,  null ) ; channel. queueDeclare ( "testQueue" ,  true ,  false ,  false ,  null ) ; channel. basicConsume ( "testQueue" ,  true ,  new  Consumer ( )  { @Override public  void  handleDelivery ( String  consumerTag,  BasicProperties  basicProperties,  byte [ ]  body)  throws  MqException ,  IOException  { System . out. println ( "[消费数据] 开始!" ) ; System . out. println ( "consumerTag="  +  consumerTag) ; System . out. println ( "basicProperties="  +  basicProperties) ; String  bodyString =  new  String ( body,  0 ,  body. length) ; System . out. println ( "body="  +  bodyString) ; System . out. println ( "[消费数据] 结束!" ) ; } } ) ; while  ( true )  { Thread . sleep ( 500 ) ; } } 
} package  com. example. demo. demo ; import  com. example. demo. mqclient.  Channel ; 
import  com. example. demo. mqclient.  Connection ; 
import  com. example. demo. mqclient.  ConnectionFactory ; 
import  com. example. demo. mqsever. core.  ExchangeType ; import  java. io.  IOException ; 
public  class  DemoProducer  { public  static  void  main ( String [ ]  args)  throws  IOException ,  InterruptedException  { System . out. println ( "启动生产者" ) ; ConnectionFactory  factory =  new  ConnectionFactory ( ) ; factory. setHost ( "127.0.0.1" ) ; factory. setPort ( 9090 ) ; Connection  connection =  factory. newConnection ( ) ; Channel  channel =  connection. createChannel ( ) ; channel. exchangeDeclare ( "testExchange" ,  ExchangeType . DIRECT ,  true ,  false ,  null ) ; channel. queueDeclare ( "testQueue" ,  true ,  false ,  false ,  null ) ; byte [ ]  body =  "hello" . getBytes ( ) ; boolean  ok =  channel. basicPublish ( "testExchange" ,  "testQueue" ,  null ,  body) ; System . out. println ( "消息投递完成! ok="  +  ok) ; Thread . sleep ( 500 ) ; channel. close ( ) ; connection. close ( ) ; } 
}