1、连接、发送、发送异常、重连
 
package rabbitmqimport ("encoding/json""fmt""time""github.com/sirupsen/logrus""github.com/streadway/amqp"
)type RabbitMQ struct {conn            *amqp.Connectionchannel         *amqp.Channelconfigs         RabbitMqConfigconnErrorChan   chan *amqp.ErrorreturnErrorChan chan amqp.ReturnactivateChan    chan interface{}
}func NewRabbitMQ() *RabbitMQ {return &RabbitMQ{}
}
func (r *RabbitMQ) Init(cfg RabbitMqConfig) error {conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s%s",cfg.User,cfg.PassWord,cfg.Addr,cfg.VHost))if err != nil {return err}r.conn = connr.configs = cfgif err := r.initChannel(); err != nil {return err}if err := r.exchangeDeclare(); err != nil {return err}if err := r.queueDeclare(); err != nil {return err}if err := r.queueBind(); err != nil {return err}r.connErrorChan = make(chan *amqp.Error, 1)r.conn.NotifyClose(r.connErrorChan)go r.reopen()r.returnErrorChan = make(chan amqp.Return, 1)r.channel.NotifyReturn(r.returnErrorChan)go r.messagePushError()r.activateChan = make(chan interface{}, 1)return nil
}
func (r *RabbitMQ) reopen() {for {select {case err := <-r.connErrorChan:logrus.WithError(err).Error("RabbitMq server exception retry")if r.conn != nil {r.conn = nil        r.activateChan <- 1 }time.Sleep(time.Second * time.Duration(r.configs.Interval))if err := r.Init(r.configs); err != nil {logrus.WithError(err).Error("reopen queue rabbitmq")continue}logrus.Info("reopen rabbitmq success ")return}}
}
func (r *RabbitMQ) messagePushError() {for {select {case v, ok := <-r.returnErrorChan:if !ok {continue}logrus.WithFields(map[string]interface{}{"code":    v.ReplyCode,"message": v.ReplyText,"content": string(v.Body),}).Error("send to rabbitmq failed")case <-r.activateChan:logrus.Info("The current connection has been interrupted")return}}
}
func (r *RabbitMQ) initChannel() error {channel, err := r.conn.Channel()if err != nil {return err}if err := channel.Qos(1,     0,     false, ); err != nil {return err}r.channel = channelreturn nil
}
func (r *RabbitMQ) exchangeDeclare() error {exchange := r.configs.RabbitmqExchangereturn r.channel.ExchangeDeclare(exchange.Name,exchange.Kind,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)
}
func (r *RabbitMQ) queueDeclare() error {_, err := r.channel.QueueDeclare(r.configs.RabbitmqQueue.Name,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)return err
}
func (r *RabbitMQ) queueBind() error {return r.channel.QueueBind(r.configs.RabbitmqQueue.Name,r.configs.RabbitmqQueue.Name,r.configs.RabbitmqExchange.Name,r.configs.NoWait, nil)
}
func (r *RabbitMQ) Send(message interface{}) error {messageByte, err := json.Marshal(message)if err != nil {return err}err = r.channel.Publish("",                           r.configs.RabbitmqQueue.Name, true,                         false,                        amqp.Publishing{Headers:         amqp.Table{},ContentType:     "text/plain",ContentEncoding: "",DeliveryMode:    amqp.Persistent, Body:            messageByte,},)if err != nil {return err}return nil
}
func (r *RabbitMQ) Close() error {if err := r.conn.Close(); err != nil {return err}return nil
}
type RabbitMqConfig struct {Addr             string           `mapstructure:"addr"`        VHost            string           `mapstructure:"vhost"`       User             string           `mapstructure:"user"`        PassWord         string           `mapstructure:"password"`    Durable          bool             `mapstructure:"durable"`     AutoDelete       bool             `mapstructure:"auto_delete"` Internal         bool             `mapstructure:"internal"`    NoWait           bool             `mapstructure:"nowait"`      Interval         int              `mapstructure:"interval"`    RabbitmqExchange RabbitmqExchange `mapstructure:"exchange"`RabbitmqQueue    RabbitmqQueue    `mapstructure:"queue"`
}type RabbitmqExchange struct {Name string `mapstructure:"name"` Kind string `mapstructure:"kind"` 
}
type RabbitmqQueue struct {Name string `mapstructure:"name"` 
}
 
 
 
2、调用示例
 
package mainimport ("standard/rabbitmq/rmq"    "github.com/sirupsen/logrus"
)func main() {cfg := rabbitmq.RabbitMqConfig{Addr:       "127.0.0.1:5672",VHost:      "/",User:       "guest",PassWord:   "guest",Durable:    true,AutoDelete: false,Internal:   false,NoWait:     false,RabbitmqExchange: rabbitmq.RabbitmqExchange{Name: "exchange.test",Kind: "direct",},RabbitmqQueue: rabbitmq.RabbitmqQueue{Name: "queue.test",},Interval: 2,}err := rabbitmq.NewRabbitMQ().Init(cfg)if err != nil {logrus.WithError(err).Error("init rabbit")return}logrus.Info("init rabbitmq success")select {}
}