学习笔记-微服务高级(黑马程序员)

Sentinel

  • 测试软件

    • jmeter
  • 雪崩问题

    • 个微服务往往依赖于多个其它微服务,服务提供者I发生了故障,依赖于当前服务的其它服务随着时间的推移形成级联失败
  • 超时处理

    • 设定超时时间,请求超过一定时间没有响应就返回错误信息
  • 仓壁模式

    • 限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,也叫线程隔离
  • 断路器

    • 由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务请求
  • 限流

    • 流量控制,限制业务访问的QPS,避免服务因流量的突增而故障
    • Sentinel
  • Sentinel

    • 运行
      • java -jar sentinel-dashboard-1.8.1.jar
    • 配置
      • server.port
      • sentinel.dashboard.auth.username
        • 默认用户名
      • sentinel.dashboard.auth.password
        • 默认密码
java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar

微服务整合

  • 依赖
<!--sentinel-->
<dependency><groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
  • 配置
server:port: 8088
spring:cloud: sentinel:transport:dashboard: localhost:8080

限流

  • 流控模式
  • 直接:对当前资源限流
  • 关联:高优先级资源触发阈值,对低优先级资源限流。
  • 链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流
  • 流控效果
    • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常
      • 默认的处理方式
    • warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常
      • 这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
    • 排队等待:让所有的请求按照先后次序排队执行
      • 两个请求的间隔不能小于指定时长

隔离和降级

  • 线程隔离
    • 调用者在调用服务提供者时,给每个调用的请求分配独立线程池
    • 出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽
  • 熔断降级
    • 是在调用方这边加入断路器,统计对服务提供者的调用
    • 如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者

FeignClient整合Sentinel

  • 配置
feign:sentinel:enabled: true # 开启feign对sentinel的支持
  • 实现FallbackFactory
@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {@Overridepublic UserClient create(Throwable throwable) {return new UserClient() {@Overridepublic User findById(Long id) {log.error("查询用户异常", throwable);return new User();}};}
}
  • FallbackFactory注册Bean:
@Bean
public UserClientFallbackFactory userClientFallbackFactory(){return new UserClientFallbackFactory();
}
  • UserClient接口中使用FallbackFactory
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {@GetMapping("/user/{id}")User findById(@PathVariable("id") Long id);
}

线程隔离

  • 线程池隔离
    • 给每个服务调用业务分配一个线程池
    • 利用线程池本身实现隔离效果
  • 信号量隔离
    • 计数器模式,记录业务使用的线程数量
    • 达到信号量上限时,禁止新的请求

熔断降级

  • 断路器控制熔断和放行是通过状态机来完成
  • 状态机
    • closed:关闭状态
      • 断路器放行所有请求,并开始统计异常比例、慢请求比例
      • 超过阈值则切换到open状态
    • open:打开状态
      • 服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑
      • Open状态5秒后会进入half-open状态
    • half-open:半开状态
      • 放行一次请求,根据执行结果来判断接下来的操作。
      • 请求成功:则切换到closed状态
      • 请求失败:则切换到open状态
  • 器熔断策略
    • 慢调用
    • 异常比例
    • 异常数

授权规则

  • 白名单:来源(origin)在白名单内的调用者允许访问

  • 黑名单:来源(origin)在黑名单内的调用者不允许访问

  • 定义RequestOriginParser接口,返回不同的origin

@Component
public class HeaderOriginParser implements RequestOriginParser {@Overridepublic String parseOrigin(HttpServletRequest request) {// 1.获取请求头String origin = request.getHeader("origin");// 2.非空判断if (StringUtils.isEmpty(origin)) {origin = "blank";}return origin;}
}
  • 网关的请求添加请求头
spring:cloud:gateway:default-filters:- AddRequestHeader=origin,gateway
  • 实现BlockExceptionHandler接口,自定义异常时的返回结果
    • 处理请求被限流、降级、授权拦截时抛出BlockException
@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {@Overridepublic void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {String msg = "未知异常";int status = 429;if (e instanceof FlowException) {msg = "请求被限流了";} else if (e instanceof ParamFlowException) {msg = "请求被热点参数限流";} else if (e instanceof DegradeException) {msg = "请求被降级了";} else if (e instanceof AuthorityException) {msg = "没有权限访问";status = 401;}response.setContentType("application/json;charset=utf-8");response.setStatus(status);response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");}
}

规则持久化

  • 规则管理模式
    • 原始模式
      • Sentinel的默认模式,将规则保存在内存,重启服务会丢失
    • pull模式
      • 控制台将配置的规则推送到Sentinel客户端
      • 客户端会将配置规则保存在本地文件或数据库中
      • 定时去本地文件或数据库中查询,更新本地规则
    • push模式
      • 控制台将配置规则推送到远程配置中心Nacos
      • Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新

实现push模式

  • 依赖
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
  • 配置
spring:cloud:sentinel:datasource:flow:nacos:server-addr: localhost:8848 # nacos地址dataId: orderservice-flow-rulesgroupId: SENTINEL_GROUPrule-type: flow # 还可以是:degrade、authority、param-flow
  • 修改sentinel-dashboard源码
    • 重新启动

分布式事务

  • CAP

    • Consistency(一致性)
    • Availability(可用性)
    • Partition tolerance (分区容错性)
  • BASE

    • Basically Available (基本可用)
    • Soft State(软状态)
    • Eventually Consistent(最终一致性)
  • 分布式事务

    • AP模式
      • 各子事务分别执行和提交,允许出现结果不一致
      • 采用弥补措施恢复数据即可,实现最终一致
    • CP模式
      • 各个子事务执行后互相等待,同时提交,同时回滚,达成强一致
      • 事务等待过程中,处于弱可用状态

Seata

  • Seata的架构
    • TC (Transaction Coordinator)
      • 事务协调者
    • TM (Transaction Manager)
      • 事务管理器
    • RM (Resource Manager)
      • 资源管理器
  • 下载
    • http://seata.io/zh-cn/blog/download.html

微服务集成

  • 依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><!--版本较低,1.3.0,因此排除--><exclusion><artifactId>seata-spring-boot-starter</artifactId><groupId>io.seata</groupId></exclusion></exclusions>
</dependency>
<!--seata starter 采用1.4.2版本-->
<dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><!--<version>1.4.2</version>--><version>${seata.version}</version>
</dependency>
  • 配置
seata:registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址# 参考tc服务自己的registry.conf中的配置type: nacosnacos: # tcserver-addr: 127.0.0.1:8848namespace: ""group: DEFAULT_GROUPapplication: seata-tc-server # tc服务在nacos中的服务名称cluster: SHtx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称service:vgroup-mapping: # 事务组与TC服务cluster的映射关系seata-demo: SH

XA模式

  • RM一阶段的工作:

    • 注册分支事务到TC
    • 执行分支业务sql但不提交
    • 报告执行状态到TC
  • TC二阶段的工作:

    • TC检测各分支事务执行状态
      • 如果都成功,通知所有RM提交事务
      • 如果有失败,通知所有RM回滚事务
  • RM二阶段的工作:

    • 接收TC指令,提交或回滚事务
  • 配置文件

seata: dtat-source-proxy-mode: XA
  • 添加注解
    • 全局事务的入口方法添加@GlobalTransactional注解

AT模式

  • 阶段一RM的工作:

    • 注册分支事务
    • 记录undo-log(数据快照)
    • 执行业务sql并提交
    • 报告事务状态
  • 阶段二提交时RM的工作:

    • 删除undo-log即可
    • 阶段二回滚时RM的工作:
    • 根据undo-log恢复数据到更新前
  • 配置文件

seata: dtat-source-proxy-mode: AT
  • 添加注解
    • 全局事务的入口方法添加@GlobalTransactional注解

TCC模式

  • 需要实现三个方法
    • Try:资源的检测和预留;
    • Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
    • Cancel:预留资源释放,可以理解为try的反向操作。
@LocalTCC
public interface AccountTCCService {@TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,@BusinessActionContextParameter(paramName = "money")int money);boolean confirm(BusinessActionContext ctx);boolean cancel(BusinessActionContext ctx);
}

SAGA模式

  • 一阶段
    • 直接提交本地事务
  • 二阶段
    • 成功则什么都不做;失败则通过编写补偿业务来回滚

Redis

Redis持久化

  • RDB持久化

    • Redis Database Backup file
    • Redis数据备份文件,也被叫做Redis数据快照。
      • 把内存中的所有数据都记录到磁盘中
      • 当Redis实例故障重启后,从磁盘读取快照文件,恢复数据
  • RDB执行时机

    • Redis停机时会执行一次RDB
    • Redis内部有触发RDB的机制
      • 可以在redis.conf文件中找到
  • 异步持久化bgsave

    • fork主进程得到一个子进程,共享内存空间
    • 子进程读取内存数据并写入新的RDB文件
    • 用新RDB文件替换旧的RDB文件
  • AOF持久化

  • Append Only File(追加文件)

  • Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件

  • 频率

    • always
    • ererysec
    • no

Redis主从

  • 搭建主从架构

    • 搭建主从集群,实现读写分离
  • 主从数据同步原理

    • 全量同步
      • 第一次同步
    • 增量同步
      • slave重启后同步

Redis哨兵

  • Sentinel

    • 监控
      • 心跳
    • 自动故障恢复
      • 选举新master
    • 通知
      • 通知客户端新的master
  • 心跳检测

    • 主观下线
    • 客观下线

RedisTemplate哨兵模式

  • 依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:redis:password: 1234 #如果Redis有密码市一定要配置密码sentinel:master: mymaster #指定master名称nodes: # 指定Redis集群信息- ip地址:27001- ip地址:27002- ip地址:27003
  • RedisTemplate配置主从读写分离
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){// REPLICA_PREFERRED 优先从slave节点读取 return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

Redis分片集群(增强存储能力)

  • 分片集群
    • 每个集群多个master,每个master保存不同数据
    • 每个master多个slave
    • master直接通过ping监测彼此
    • 客户端访问集群任意节点,最终都会被转发到正确的节点
  • 散列插槽
    • redis会把master节点映射到0~1638插槽上
    • 数据的key与插槽绑定

集群伸缩

  • 新增
    • redis-cli -a 密码 --cluster add-node ip地址:port ip地址:7001
  • 分配插槽
    • redis-cli -a 密码 --cluster reshard ip地址:port
  • 删除
    • redis-cli -a 密码 --cluster del-node ip地址:port 节点id

故障转移

  • 自动故障转移

    • 自动
  • 手动故障转移

    • cluster failover命令

RedisTemplate访问分片集群

  • 引入redis依赖
  • 配置分片集群地址
  • 配置读写分离
spring:redis:cluster:nodes:- ip:port- ip:port- ip:port- ip:port- ip:port

多级缓存

  • 多级缓存
    • 浏览器&客户端
    • nginx本地缓存
    • redis缓存
    • JVM进程缓存
    • 数据库

JVM进程缓存

  • 分布式缓存
    • redis
  • 进程本地缓存
    • HashMap
    • GuavaCache
    • Caffeine
      • Spring内部的缓存使用的就是Caffeine

Caffeine

// 构建cache
Cache<String,String> cache = Caffeine.newBuilder().build();
// 存储数据
cache.put("gf","xxx");
// 取值
String gf = cache.getIfPresent("gf");String defaultGF = cache.get("defaultGF",key->{return "xxx" 
});
  • 缓存驱逐策略

    • 基于容器
      • 数量上限
    • 基于时间
      • 设置有效时间
    • 基于引用
      • 不建议使用
  • 基于容器

Cache<String,String> cache = Caffeine.newBuilder().maximumSize(1).build();
  • 基于时间
Cache<String,String> cache = Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(10)).build();

Lua语法

helloworld

  • 新建
touch hello.lua
  • 添加内容
print("hello world")
  • 运行
lua hello.lua

数据类型

  • 数据类型
    • nil
      • 无效值
    • boolean
    • number
    • string
    • function
    • table
      • 数组key为索引
        • local arr = {‘xx’,‘xx’}
        • print(arr[1])
      • map
        • loacl map = {name=‘xx’,age=2}
        • map[‘name’]
        • map.name
  • 查看数据类型
    • type()
  • 循环遍历table
local arr = {'ar','xx',ss}
for index,value in ipairs(arr) doprint(index,value)
end
loacl map = {name='xx',age=2}
for index,value in ipairs(map) doprint(index,value)
end
  • 函数
function printArr(arr)for index,value in ipairs(arr) doprint(value)end
end
  • 条件控制
if(布尔表达式)
thenelseend
  • 逻辑运算
    • and
    • or
    • not

OpenResty

  • 基于nginx的高性能web平台

  • 安装

    • OpenResty
    • opm
  • 目录

    • /usr/local/openresty
  • 获取请求参数

    • 路径占位符
      • /item/111
      • local id=ngx.var[1]
    • 请求头
      • local headers = ngx.req.get_headers()
    • Get
      • local getParams = ngx.req.get_uri_args()
    • post
      • gx.req.read_body()
      • local postParams = ngx.req.get_post_args()
    • JSON
      • gx.req.read_body()
      • local jsonBody = ngx.req.get_post_data()
  • 封装http请求

    • /usr/lodal/openresty/lualib/common.lua
--封装函数,发送http请求,并解析响应
local function read http(path,params)local resp = ngx.location.capture(path,{method =ngx.HTTP GET,args =params,})if not resp then--记录错误信息,返回404ngx.log(ngx.ERR,"http not found,path:",path ,",args:",args)ngx.exit(404)endreturn resp.body
end
--将方法导出
Local _M = {read_http = read_http
}
return _M
--导入common函数库
local common=require('common')
local read_http = common.read_http
--导入cjson库
local cjson=require('cjson')-- 获取路径参数
local id =ngx.var[1]-- 查询商品信息
local itemJSON =read_http("/item/" .. id, nil)-- 查询库存信息
local stockJSON = read_http("/item/stock/"..id, nil)
-- JSON转化为lua的tablelocal item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock =stock.stock
item.sold = stock.sold
-- 把item序列化为ison 返回结果
ngx.say(cjson.encode(item))

nginx本地缓存

-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache-- 封装查询函数
function read_data(key, expire, path, params)-- 查询本地缓存local val = item_cache:get(key)if not val thenngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)-- 查询redisval = read_redis("127.0.0.1", 6379, key)-- 判断查询结果if not val thenngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)-- redis查询失败,去查询httpval = read_http(path, params)endend-- 查询成功,把数据写入本地缓存item_cache:set(key, val, expire)-- 返回数据return val
end
  • 设置缓存时间1800/60
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)

Redis 缓存预热

  • 冷启动

    • 服务刚刚启动时,redis中并没有缓存
    • 如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力
  • 缓存预热

    • 在实际开发中,可以利用大数据统计用户访问的热点数据,项目启动时将这些热点数据提前查询并保存redis中
  • 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • 配置
spring:redis:host: 192.168.150.101
  • 初始化类
 
@Component
public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息List<Item> itemList = itemService.list();// 2.放入缓存for (Item item : itemList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查询商品库存信息List<ItemStock> stockList = stockService.list();// 4.放入缓存for (ItemStock stock : stockList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}
}
  • openResty查询redis
    • common.lua
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒local pool_size = 100 --连接池大小local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)if not ok thenngx.log(ngx.ERR, "放入redis连接池失败: ", err)end
end-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)-- 获取一个连接local ok, err = red:connect(ip, port)if not ok thenngx.log(ngx.ERR, "连接redis失败 : ", err)return nilend-- 查询redislocal resp, err = red:get(key)-- 查询失败处理if not resp thenngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)end--得到的数据为空处理if resp == ngx.null thenresp = nilngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)endclose_redis(red)return resp
end-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)local resp = ngx.location.capture(path,{method = ngx.HTTP_GET,args = params,})if not resp then-- 记录错误信息,返回404ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)ngx.exit(404)endreturn resp.body
end
-- 将方法导出
local _M = {  read_http = read_http,read_redis = read_redis
}  
return _M
  • 先查redis,没有再查服务
    • item.lua
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')-- 封装查询函数
function read_data(key, path, params)-- 查询本地缓存local val = read_redis("127.0.0.1", 6379, key)-- 判断查询结果if not val thenngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)-- redis查询失败,去查询httpval = read_http(path, params)end-- 返回数据return val
end-- 获取路径参数
local id = ngx.var[1]-- 查询商品信息
local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

缓存同步

  • 同步策略
    • 设置有效期:给缓存设置有效期,到期后自动删除,再次查询时更新
    • 同步双写:在修改数据库的同时,直接修改缓存
    • 异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
      • MQ
      • Canal
        • 自己伪装成MySQL的一个slave节点,监听master的binary log变化

Canal

  • 依赖
<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version>
</dependency>
  • 配置
canal:destination: heima # canal的集群名字,要与安装canal时设置的名称一致server: 192.168.150.101:11111 # canal服务地址
  • domain

@Data
@TableName("tb_item")
public class Item {@TableId(type = IdType.AUTO)@Idprivate Long id;//商品id@Column(name = "name")private String name;//商品名称private String title;//商品标题private Long price;//价格(分)private String image;//商品图片private String category;//分类名称private String brand;//品牌名称private String spec;//规格private Integer status;//商品状态 1-正常,2-下架private Date createTime;//创建时间private Date updateTime;//更新时间@TableField(exist = false)@Transientprivate Integer stock;@TableField(exist = false)@Transientprivate Integer sold;
  • 监听器

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {@Autowiredprivate RedisHandler redisHandler;@Autowiredprivate Cache<Long, Item> itemCache;@Overridepublic void insert(Item item) {// 写数据到JVM进程缓存itemCache.put(item.getId(), item);// 写数据到redisredisHandler.saveItem(item);}@Overridepublic void update(Item before, Item after) {// 写数据到JVM进程缓存itemCache.put(after.getId(), after);// 写数据到redisredisHandler.saveItem(after);}@Overridepublic void delete(Item item) {// 删除数据到JVM进程缓存itemCache.invalidate(item.getId());// 删除数据到redisredisHandler.deleteItemById(item.getId());}
}
  • RedisHandler
@Component
public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息List<Item> itemList = itemService.list();// 2.放入缓存for (Item item : itemList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查询商品库存信息List<ItemStock> stockList = stockService.list();// 4.放入缓存for (ItemStock stock : stockList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}public void saveItem(Item item) {try {String json = MAPPER.writeValueAsString(item);redisTemplate.opsForValue().set("item:id:" + item.getId(), json);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public void deleteItemById(Long id) {redisTemplate.delete("item:id:" + id);}
}

RabbitMQ高级特性

消息可靠性

  • 丢失原因

    • 发送时丢失
      • 生产者发送的消息未送达exchange
      • 消息到达exchange后未到达queue
    • MQ宕机,queue将消息丢失
    • consumer接收到消息后未消费就宕机
  • 解决方案

    • 生产者确认机制
    • mq持久化
    • 消费者确认机制
    • 失败重试机制

生产者消息确认

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因
  • 配置

spring:rabbitmq:# simple:同步等待confirm结果,直到超时# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublisher-confirm-type: correlated# 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbackpublisher-returns: truetemplate:# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息mandatory: true
  • ReturnCallback
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}
  • ConfirmCallback
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执
Thread.sleep(2000);

消息持久化

  • 消息持久化机制

    • 交换机持久化
    • 队列持久化
    • 消息持久化
  • 交换机持久化

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}
  • 队列持久化
@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
  • 消息持久化
// 准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(Standardcharsets.UTF_8))// 默认就是持久化的.setDeliveryMode(MessageDeliveryMode.PERSISTENT)    // durable 持久.build();
// 发送消息
rabbitTemplate.convertAndSend(  "simple.queue", message);

消费者确认

  • SpringAMQP三种确认模式:
    • manual
      • 手动ack,需要在业务代码结束后,调用api发送ack。
    • auto
      • 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    • none
      • 关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack none/auto/manual

消费失败重试机制

  • 本地重试
    • 利用Spring的retry机制,在消费者出现异常时利用本地重试
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 失败策略
    • MessageRecovery接口
      • RejectAndDontRequeueRecoverer
        • 重试耗尽后,直接reject,丢弃消息
        • 默认就是这种方式
      • ImmediateRequeueMessageRecoverer
        • 重试耗尽后,返回nack,消息重新入队
      • RepublishMessageRecoverer
        • 重试耗尽后,将失败消息投递到指定的交换机
@Configuration
public class ErrorMessageConfig {@Bean// 交换机public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Bean// 队列public Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Bean// RepublishMessageRecoverer,关联队列和交换机public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

死信交换机

  • 死信(dead letter)
    • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
    • 消息是一个过期消息,超时无人消费
    • 要投递的队列消息满了,无法投递
  • 死信交换机
    • 配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化.deadLetterExchange("dl.direct") // 指定死信交换机.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

TTL

  • ttl

    • 队列设置超时时间,配置x-message-ttl属性
  • 死信交换机、死信队列

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.ttl.queue", durable = "true"),exchange = @Exchange(name = "dl.ttl.direct"),key = "ttl"
))
public void listenDlQueue(String msg){log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
  • 声明一个队列,并且指定TTL
@Bean
public Queue ttlQueue(){return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化.ttl(10000) // 设置队列的超时时间,10秒.deadLetterExchange("dl.ttl.direct") // 指定死信交换机.build();
}
  • 将ttl与交换机绑定
@Bean
public DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
  • 发送消息时,设定TTL
@Test
public void testTTLMsg() {// 创建消息Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration("5000").build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);log.debug("发送消息成功");
}

延迟队列

  • 插件

    • DelayExchange
  • 流程

    • 接收消息
    • 判断消息是否具备x-delay属性
    • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
    • 返回routing not found结果给消息发送者
    • x-delay时间到期后,重新投递消息到指定队列
  • 声明延迟交换机

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
  • 发送延迟消息
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}
});

惰性队列

  • 消息堆积

    • 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限
    • 之后发送的消息就会成为死信,可能会被丢弃
  • Lazy Queues

    • 接收到消息后直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储
  • 通过命令行将一个运行中的队列修改为惰性队列

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
  • java
@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

集群

  • 普通集群/叫标准集群(classic cluster)
    • 会在集群的各个节点间共享部分数据
      • 包括:交换机、队列元信息
      • 不包含:队列中的消息
    • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
    • 队列所在节点宕机,队列中的消息就会丢失
  • 镜像集群/主从模式
    • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
    • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
    • 一个队列的主节点可能是另一个队列的镜像节点
    • 所有操作都是主节点完成,然后同步给镜像节点
    • 主宕机后,镜像节点会替代成新的主
  • 仲裁队列
    • 与镜像队列一样,都是主从模式,支持主从数据同步
    • 使用非常简单,没有复杂的配置
    • 主从同步基于Raft协议,强一致

仲裁队列

  • 创建仲裁队列
@Bean
public Queue quorumQueue() {return QueueBuilder.durable("quorum.queue") // 持久化.quorum() // 仲裁队列.build();
}
  • 配置
spring:rabbitmq:addresses: ip:port, ip:port, ip:portusername: xxpassword: xxvirtual-host: /

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/825627.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot的“校园新闻网站”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“校园新闻网站”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统功能结构图 系统功能界面图 用户登录 校园新…

揭秘!回归测试覆盖率究竟是什么?

回归测试覆盖率是软件测试中的一个重要概念&#xff0c;它衡量的是回归测试过程中&#xff0c;能够覆盖到旧代码的比例。在软件开发的生命周期中&#xff0c;每当有新的代码提交或是修改后&#xff0c;都需要进行回归测试以确保新代码不会对原有的功能产生负面影响。 1.什么是…

从入门到实践,详解 Web 爬虫技术(IP池免费送)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

NLP vs. LLMs: 理解它们之间的区别

作者&#xff1a;Elastic Platform Team 随着人工智能持续发展并在无数行业解决问题&#xff0c;技术的一个关键部分是能够无缝地桥接人类语言和机器理解之间的差距。这就是自然语言处理&#xff08;NLP&#xff09;和大型语言模型&#xff08;LLMs&#xff09;的用武之地。它们…

source map 开发优化工具

什么是 Source map 简单来说 Source map 就是一个存储信息的文件&#xff0c;里面储存着位置信息。 Source map 英文释义&#xff1a;源程序映射。 位置信息&#xff1a;转换后的代码 对应的 转换前的代码 位置映射关系。 有了 Source map&#xff0c;就算线上运行的是转换…

电脑技巧:如何把Edge浏览器扩展程序打包安装到其他浏览器

目录 1、进入浏览器扩展界面 2、找到Edge浏览器扩展插件的路径 3、找到需要扩展的插件ID 4、打开浏览器扩展插件目录 5、进入打包扩展界面 6、 安装到其他浏览器 大家日常使用浏览器的时候通常会安装很多浏览器插件&#xff0c;从而大大提升我们的办公效率&#xff0c;有…

带小数点的String类型数据,如何只取整数?

一、场景引入 如果前端页面存在列表展示用户数据&#xff0c;但是用户数据存在非常多的小数位&#xff0c;从页面来看&#xff0c;数据太多就会不太美观&#xff0c;因此&#xff0c;出于场景美化考虑&#xff0c;在不影响业务功能的情况下&#xff0c;可以只展示整数内容&…

代码编辑器特效爆炸html5

源码介绍 代码编辑器特效爆炸html5&#xff0c;代码高亮显示&#xff0c;输入代码爆炸动态效果显示。非常的帅气&#xff0c;爱不释手~ 效果截图 源码下载 代码编辑器特效爆炸html5

Flask 解决指定端口无法生效问题

问题重现 手动指定的IP端口是app.run(host0.0.0.0, port9304)&#xff0c;但是启动的地址显示的却是http://127.0.0.1:5000。 if __name__ __main__:app.run(host0.0.0.0, port9304)启动地址如下&#xff1a; 解决方案 PyCharm会自动识别出来flask项目&#xff08;即使你…

Py深度学习基础|Numpy基础总结

注&#xff1a;本文来自菜鸟教程学习总结 一、数组属性 NumPy 的数组中比较重要 ndarray 对象属性有&#xff1a; 注意&#xff1a;使用reshape后&#xff0c;数组的结构&#xff08;即元素的排列顺序和内在连接&#xff09;没有改变&#xff0c;但因为返回的是一个视图&#…

机器学习笔记 - 使用 OpenCV 的结构化森林进行边缘检测

一、简述 边缘检测是计算机视觉领域中一项非常重要的任务。这是许多纯计算机视觉任务(例如轮廓检测)的第一步。即使涉及深度学习,较深层也首先学习识别边缘,然后再学习图像的复杂特征。所以,我们可以说边缘检测在计算机视觉领域非常重要。拥有良好且高效的图像边缘检测算法…

Flink KafkaSink分区配置的不同版本对比

Flink KafkaSink分区配置的不同版本对比 在不同版本的Flink中&#xff0c;KafkaSink 分区默认配置方式可能会有一些变化。以下是摘自Flink官方文档不同版本的原文&#xff1a; 1. Flink版本&#xff1a;1.12~1.19 Sink 分区 # 配置项 sink.partitioner 指定了从 Flink 分区到 …

Yoshua Bengio独家专访:我不想把大模型未来押注在Scaling Law上,AGI路上要“注意安全”...

导读 漫长的30年间&#xff0c;数度从主流方向的超然出走&#xff0c;是Bengio的制胜秘诀。这种不盲从主流的风格体现在他研究生涯的方方面面。 90年代末期&#xff0c;神经网络被打入冷宫&#xff0c;Bengio的论文多次遭拒&#xff0c;连学生们也开始担心&#xff0c;和他一起…

【ESP32 手机配网教程】

【ESP32 手机配网教程】 1. 前言2. 先决条件2.1 环境配置2.2 所需零件3.3 硬件连接步骤 3. Web热点手动配网3.1. 准备工作3.2. 编译上传程序3.3. 进行手动配网 4. BLE无线配网4.1. 准备工作**4.2. 编译上传程序4.3. 使用手机APP进行无线配网 5. 总结 1. 前言 欢迎使用ESP32进行…

python将xml格式文件转成png或者pdf格式

本文主要介绍运行NCCL代码时输出的xml文件该如何转成更加容易观看的图格式 如下是举例&#xff0c;服务器上的PCIE相关的topo xml 文件 <system version"1"><cpu numaid"1" affinity"ffffff00,0000ffff,ff000000" arch"x86_64&q…

Next.js多页布局getLayout使用方法

目录 官网解释 直接上代码使用方法展示 1.page页面​编辑 2._app.js页面,也放在pages中​编辑 效果展示 有getLayout展示getLayout返回的页面布局 无getLayout展示默认布局 官网解释 如果需要多个布局&#xff0c;可以添加一个属性getLayout添加到您的页面&#xff0c;允…

2024华中杯数学建模挑战赛选题建议及各题思路来啦!

大家好呀&#xff0c;华中杯数学建模开始了&#xff0c;来说一下初步的选题建议吧&#xff1a; 首先定下主基调&#xff0c; 本次华中杯推荐选择C题目。难度方面A&#xff1e;B&#xff1e;C&#xff0c;A是优化类题目&#xff0c;难度较高&#xff0c;建议参考23国赛A优秀论…

深入探索:Facebook如何重塑社交互动

在当代社会中&#xff0c;社交互动已成为日常生活的核心组成部分。而在众多的社交媒体平台中&#xff0c;Facebook凭借其卓越的用户基础和创新的功能&#xff0c;已经成为了全球最大的社交媒体平台。本文将深入探讨Facebook如何通过其独特的特性和功能&#xff0c;重塑了人们的…

Springboot+Vue线上教学平台赠送配套文档1w字

SpringbootVue线上教学平台赠送配套文档1w字 项目描述 线上教学平台是一个功能丰富的在线教育工具&#xff0c;它为学生、教师和管理员提供了一个集成的学习、交流和管理环境。以下是关于该平台各项功能的简要介绍&#xff1a; 前台门户&#xff1a;前台门户是平台的门面&#…

实战|哈尔滨等保2.0 Linux主机测评过程之身份鉴别

一、身份鉴别 a)应对登录的用户进行身份标识和鉴别&#xff0c;身份标识具有唯一性&#xff0c;身份鉴别信息具有复杂度要求并定期更换。 输入 more /etc/shadow,得知系统所有用户&#xff0c;此语句字段格式有九段。 第一字段&#xff1a;用户名&#xff08;也被称为登录名…