目录  1.介绍 2.安装   3.AMQP-CPP 简单使用   4.类与接口   5.使用 1.publish.cc 2.consume.cc 3.makefile   
 
RabbitMQ:消息队列 组件,实现两个客户端主机之间消息传输的功能(发布&订阅)核心概念 :交换机、队列、绑定、消息交换机类型 : 广播交换 :当交换机收到消息,则将消息发布到所有绑定的队列中直接交换 :根据消息中的bkey与绑定的rkey对比,一致则放入队列主题交换 :使用bkey与绑定的rkey进行规则匹配,成功则放入队列C语言库 C++库sudo  apt  install  libev-dev 
git  clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd  AMQP-CPP/
make 
make  install 
 如果安装时出现以下报错,则表示ssl版本出现问题/usr/include/openssl/macros.h:147:4: error: #error 
"OPENSSL_API_COMPAT expresses an impossible API compatibility 
level" 147 | #  error "OPENSSL_API_COMPAT expresses an impossible API 
compatibility level" |    ^~~~~ 
In file included from /usr/include/openssl/ssl.h:18, from linux_tcp/openssl.h:20, from linux_tcp/openssl.cpp:12: 
/usr/include/openssl/bio.h:687:1: error: expected constructor, 
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’ 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, 
unsigned short *port_ptr))
 解决方案 :卸载当前的ssl库,重新进行修复安装dpkg -l |  grep  ssl
sudo  dpkg -P --force-all libevent-openssl-2.1-7
sudo  dpkg -P --force-all openssl
sudo  dpkg -P --force-all libssl-dev
sudo  apt  --fix-broken install 
AMQP-CPP是用于与RabbitMq消息中间件通信的C++库 它能解析从RabbitMq服务发送来的数据,也可以生成发向RabbitMq的数据包 AMQP-CPP库不会向RabbitMq建立网络连接,所有的网络IO由用户完成AMQP-CPP提供了可选的网络层接口,它预定义了TCP模块,用户就不用自己实现网络IO, 也可以选择libevent、libev、libuv、asio等异步通信组件, 需要手动安装对应的组件 AMQP-CPP完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中注意 :它需要C++17的支持AMQP-CPP的使用有两种模式: 使用默认的TCP模块进行网络通信 使用扩展的libevent、libev、libuv、asio异步通信组件进行通信 此处以libev为例,不需要自己实现monitor函数,可以直接使用AMQP::LibEvHandler channel是一个虚拟连接,一个连接上可以建立多个通道 并且所有的RabbitMq指令都是通过channel传输   因为所有操作是异步的,所以在channel上执行指令的返回值并不能作为操作执行结果 实际上它返回的是Deferred类,可以使用它安装处理函数  namespace  AMQP 
{   using  SuccessCallback =  std:: function< void ( ) > ;  using  ErrorCallback =  std:: function< void ( const  char  * message) > ;  using  FinalizeCallback =  std:: function< void ( ) > ;  using  QueueCallback =  std:: function< void ( const  std:: string & name,  uint32_t  messagecount,  uint32_t  consumercount) > ; using  DeleteCallback =  std:: function< void ( uint32_t  deletedmessages) > ;  using  MessageCallback =  std:: function< void ( const  Message & message,  uint64_t  deliveryTag,  bool  redelivered) > ;  using  AckCallback =  std:: function< void ( uint64_t  deliveryTag,  bool  multiple) > ; using  PublishAckCallback =  std:: function< void ( ) > ;  using  PublishNackCallback =  std:: function< void ( ) > ;  using  PublishLostCallback =  std:: function< void ( ) > ;  class  Channel  {  Channel ( Connection * connection) ;  bool  connected ( ) ;  Deferred & declareExchange ( const  std:: string_view & name, ExchangeType type, int  flags, const  Table & arguments) ;  DeferredQueue & declareQueue ( const  std:: string_view & name, int  flags, const  Table & arguments) ;  Deferred & bindQueue ( const  std:: string_view & exchange, const  std:: string_view & queue, const  std:: string_view & routingkey, const  Table & arguments) ;  bool  publish ( const  std:: string_view & exchange, const  std:: string_view & routingKey, const  std:: string & message, int  flags =  0 ) ;  DeferredConsumer & consume ( const  std:: string_view & queue, const  std:: string_view & tag, int  flags, const  Table & arguments) ;  bool  ack ( uint64_t  deliveryTag,  int  flags= 0 ) ; } ; class  DeferredConsumer  {   DeferredConsumer & onSuccess ( const  ConsumeCallback&  callback) ;  DeferredConsumer & onReceived ( const  MessageCallback&  callback) ;  DeferredConsumer & onMessage ( const  MessageCallback&  callback) ;  DeferredConsumer & onCancelled ( const  CancelCallback&  callback) ; } ; class  Message  :  public  Envelope {  const  std:: string & exchange ( ) ; const  std:: string & routingkey ( ) ; } ; class  Envelope  :  public  MetaData {  const  char  * body ( ) ;   uint64_t  bodySize ( ) ;  } ; 
} 
typedef  struct  ev_async  
{  EV_WATCHER  ( ev_async) ; EV_ATOMIC_T sent;   
} ev_async;  
enum  
{  EVBREAK_CANCEL =  0 ,   EVBREAK_ONE    =  1 ,   EVBREAK_ALL    =  2    
} ;  
struct  ev_loop  * ev_default_loop  ( unsigned  int  flags EV_CPP  ( =  0 ) ) ; 
#  define  EV_DEFAULT   ev_default_loop  ( 0 )  int   ev_run  ( struct  ev_loop  * loop) ; 
void  ev_break  ( struct  ev_loop  * loop,  int32_t  break_type)  ;   void  ( * callback) ( struct  ev_loop  * loop,  ev_async * watcher,  int32_t  revents) ; 
void  ev_async_init ( ev_async * w,  callback cb) ; 
void  ev_async_start ( struct  ev_loop  * loop,  ev_async * w) ;  
void  ev_async_send ( struct  ev_loop  * loop,  ev_async * w) ; 
# include  <ev.h> # include  <amqpcpp.h> # include  <amqpcpp/libev.h> # include  <openssl/ssl.h> # include  <openssl/opensslv.h> int  main ( ) 
{ auto  * loop =  EV_DEFAULT; AMQP:: LibEvHandler handler ( loop) ; AMQP:: Address address ( "amqp://root:SnowK8989@127.0.0.1:5672/" ) ; AMQP:: TcpConnection connection ( & handler,  address) ; AMQP:: TcpChannel channel ( & connection) ; channel. declareExchange ( "test-exchange" ,  AMQP:: ExchangeType:: direct) . onError ( [ ] ( const  char  * message) {  std:: cout <<  "声明交换机失败: "  <<  message <<  std:: endl;  } ) . onSuccess ( [ ] ( ) {  std:: cout <<  "test-exchange 交换机创建成功"  <<  std:: endl;  } ) ; channel. declareQueue ( "test-queue" ) . onError ( [ ] ( const  char  * message) {  std:: cout <<  "声明队列失败: "  <<  message <<  std:: endl;  } ) . onSuccess ( [ ] ( ) {  std:: cout <<  "test-queue 队列创建成功"  <<  std:: endl;  } ) ; channel. bindQueue ( "test-exchange" ,  "test-queue" ,  "test-queue-key" ) . onError ( [ ] ( const  char  * message) {  std:: cout <<  "test-exchange - test-queue 绑定失败: "  \<<  message <<  std:: endl;  } ) . onSuccess ( [ ] ( ) {  std:: cout <<  "test-exchange - test-queue 绑定成功"  <<  std:: endl;  } ) ; for  ( int  i =  0 ;  i <  5 ;  ++ i) { std:: string msg =  "Hello SnowK-"  +  std:: to_string ( i) ; if ( channel. publish ( "test-exchange" ,  "test-queue-key" ,  msg)  ==  false ) { std:: cout <<  "publish 失败"  <<  std:: endl; } } ev_run ( loop,  0 ) ; return  0 ; 
} 
# include  <ev.h> # include  <amqpcpp.h> # include  <amqpcpp/libev.h> # include  <openssl/ssl.h> # include  <openssl/opensslv.h> void  MessageCB ( AMQP:: TcpChannel*  channel,  const  AMQP:: Message&  message,  uint64_t  deliveryTag,  bool  redelivered) 
{ std:: string msg; msg. assign ( message. body ( ) ,  message. bodySize ( ) ) ; std:: cout <<  msg <<  std:: endl; channel-> ack ( deliveryTag) ; 
} int  main ( ) 
{ auto  * loop =  EV_DEFAULT; AMQP:: LibEvHandler handler ( loop) ; AMQP:: Address address ( "amqp://root:SnowK8989@127.0.0.1:5672/" ) ; AMQP:: TcpConnection connection ( & handler,  address) ; AMQP:: TcpChannel channel ( & connection) ; channel. declareExchange ( "test-exchange" ,  AMQP:: ExchangeType:: direct) . onError ( [ ] ( const  char  * message) {  std:: cout <<  "声明交换机失败: "  <<  message <<  std:: endl;  } ) . onSuccess ( [ ] ( ) {  std:: cout <<  "test-exchange 交换机创建成功"  <<  std:: endl;  } ) ; channel. declareQueue ( "test-queue" ) . onError ( [ ] ( const  char  * message) {  std:: cout <<  "声明队列失败: "  <<  message <<  std:: endl;  } ) . onSuccess ( [ ] ( ) {  std:: cout <<  "test-queue 队列创建成功"  <<  std:: endl;  } ) ; channel. bindQueue ( "test-exchange" ,  "test-queue" ,  "test-queue-key" ) . onError ( [ ] ( const  char  * message) {  std:: cout <<  "test-exchange - test-queue 绑定失败: "  \<<  message <<  std:: endl;  } ) . onSuccess ( [ ] ( ) {  std:: cout <<  "test-exchange - test-queue 绑定成功" ;  } ) ; auto  callback =  std:: bind ( MessageCB,  & channel,  std:: placeholders:: _1,  std:: placeholders:: _2,  std:: placeholders:: _3) ; channel. consume ( "test-queue" ,  "consume-tag" ) . onReceived ( callback) . onError ( [ ] ( const  char  * message) {  std:: cout <<  "订阅 test-queue 队列消息失败: "  <<  message <<  std:: endl; exit ( 0 ) ;  } ) ; ev_run ( loop,  0 ) ; return  0 ; 
} 
all: publish consume
publish: publish.ccg++ -o $@ $^ -lamqpcpp -lev -std=c++17
consume: consume.ccg++ -o $@ $^ -lamqpcpp -lev -std=c++17.PHONY:clean
clean:rm publish consume