基于zbus的MySQL透明代理(100行)

项目地址 https://git.oschina.net/rushmore/zbus

我们上次讲到zbus网络通讯的核心API:

Dispatcher -- 负责-NIO网络事件Selector引擎的管理,对Selector引擎负载均衡

IoAdaptor -- 网络事件的处理,服务器与客户端共用,负责读写,消息分包组包等

Session -- 代表网络链接,可以读写消息

实际的应用,我们几乎只需要做IoAdaptor的个性化实现就能完成高效的网络通讯服务,今天我们将举例说明如何个性化这个IoAdaptor。

我们今天要完成的目标是:实现MySQL服务器的透明代理。效果是,你访问代理服务器跟访问目标MySQL无差异。

我们在测试环境10.17.2.30:3306 这台机器上提供了MySql,在我们本地机器上跑起来我们今天基于zbus.NET实现的一个代理程序,就能达到下面的效果。

image
image

完成大概不到100 行的代码, Cool?Let’s roll!

首先,我们思考透明TCP代理到底在干啥,透明的TCP代理的业务逻辑其实非常简单,可以描述为,将来自代理上游(发起请求到代理)的数据转发到目标TCP服务器,把目标服务器回来的数据原路返回代理上游客户端。 注意这个原路,如何做到原路返回成为关键点。这个示例其实跟MySQL没有任何关系,原则上任何TCP层面的服务都应该适配。

基于zbus.NET怎么来将上面的逻辑在体现出来,也就是如何个性化IoAdaptor?直观的讲,我们要处理的几个事件应该包括:1)从上游客户端发起的链接请求--代理服务器的Accept事件,2)代理服务器连接目标服务器的Connect事件,3)上下游的数据事件onMessage。

zbus.NET的IoAdaptor提供的个性化事件如下

image

基本包括一个链接(客户端或者服务端)的生命周期,与消息的编解码。

我们的代理IoAdaptor就是逐一个性化处理。

第一步,编解码: 透明代理对消息内容不做理解,所以不需要编解码。

// 透传不需要编解码,简单返回ByteBuffer数据public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透传不需要编解码,简单返回ByteBuffer数据public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}

第二步,代理服务接入:

@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}

这里的逻辑思路是,代理服务器每接受到一个请求--通过onSessionAccepted表达,我们将同时创建一个到目标服务器的链接,今天的例子是目标MySQL服务器,注意上面的处理中把创建目标服务器Session过程与真正链接到目标服务分开(Dispatcher也提供合并二者的工具方法),是为了能在没有发生链接之前绑定上好上下游关系,通过Session的chain变量来表达,也就是当前Session的关联Session,关联好之后启动感兴趣Connect事件,逻辑处理完毕。

第三步,链接成功事件(第二步中需要链接到目标服务器)

@Overridepublic void onSessionConnected(Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; }   if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}

这里的一个核心是当上下游都处于链接正常态,上下游Session都启动感兴趣消息读事件(写事件是在读取处理中自动触发),为什么在这里做的原因是一定要等上下游都正常态后才启动双方消息处理,不然会出现字节丢失。

第四步,处理上下游数据事件

@Overrideprotected void onMessage(Object msg, Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }

是不是非常简单,类似pipeline,从一端的数据写到另外一端。

原则上面4步结束,整个透明代理就完成了,但是为了处理链接异常清理,我们增加了Session清理处理,如下

@Overridepublic void onSessionToDestroy(Session sess) throws IOException {   try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try {    sess.chain.close();    sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}

工作就是解决上下游链接清理链接。

至此为止我们的IoAdaptor个性化就完成了,是不是非常简单,现在我们要跑起来测试了,下面的代码就是上一次讲到重复的设置,没有新意。

public static void main(String[] args) throws Exception {   Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.start();}

骚年,包括渣渣import和少许注释加起来折腾了不到100行,该跑一跑了,还是那句话,不是HelloWorld,你可以规模压力测。看看你是否在本地代理出来了你的目标服务MySQL,gl,hf, gogogo.

完整代码可运行代码如下,也可直接到zbus示例代码库中找到

https://git.oschina.net/rushmore/zbus/blob/master/src/test/java/org/zbus/net/TcpProxyAdaptor.java?dir=0&filepath=src%2Ftest%2Fjava%2Forg%2Fzbus%2Fnet%2FTcpProxyAdaptor.java&oid=08abff381d93519485e1c0ee2c35f1d4f8d1814c&sha=a29272ed99a8f21ec19a14b403ebee53a385e9a4

package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;  
public class TcpProxyAdaptor extends IoAdaptor {private String targetAddress;public TcpProxyAdaptor(String targetAddress) {this.targetAddress = targetAddress;}// 透传不需要编解码,简单返回ByteBuffer数据public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透传不需要编解码,简单返回ByteBuffer数据public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}@Overridepublic void onSessionConnected(Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; }   if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}@Overrideprotected void onMessage(Object msg, Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }@Overridepublic void onSessionToDestroy(Session sess) throws IOException {   try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try {    sess.chain.close();    sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}@SuppressWarnings("resource")public static void main(String[] args) throws Exception {   Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306);server.setServerName("TcpProxyServer");server.start();}
}

文章转载自 开源中国社区[https://www.oschina.net]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/395514.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux path环境变量起什么作用,shell基础(5)PATH环境变量的作用和使用方法

释放双眼,带上耳机,听听看~!关于PATH的作用PATH说简单点就是一个字符串变量,当输入命令的时候LINUX会去查找PATH里面记录的路径。比如在根目录/下可以输入命令ls,在/usr目录下也可以输入ls,但其实ls这个命令根本不在这个两个目录下…

python3 自动打包部署war包

2019独角兽企业重金招聘Python工程师标准>>> 1 调用maven 命令打包 mvn -B -f D:/workspace/ksdcourse clean package 2 调用tomcat 部署war包 ; 需要添加 CATALINA_HOME的环境变量 代码如下: #!/usr/bin/python3# -*- coding: utf-8 -*-impo…

day1作业二:多级菜单操作

作业二:多级菜单 (1)三级菜单 (2)可以次选择进入各子菜单 (3)所需新知识点:列表、字典 要求:输入back返回上一层,输入quit退出整个程序 思路: &am…

JDK源码分析(5)之 HashMap 相关

HashMap作为我们最常用的数据类型,当然有必要了解一下他内部是实现细节。相比于 JDK7 在JDK8 中引入了红黑树以及hash计算等方面的优化,使得 JDK8 中的HashMap效率要高于以往的所有版本,本文会详细介绍相关的优化,但是主要还是写 …

位运算-查找数组中唯一成对的数

基础实例一:使用位运算判断数的奇偶性 实例代码: public class Test {public static void main(String[] args) {System.out.println(isOdd(49));System.out.println(isOdd(50));}// 与运算public static boolean isOdd(int i){return (i & 1) ! 0;…

react-native-Cocoapods-Swift-Project

https://reactnative.cn/docs/integration-with-existing-apps/ 1、创建一个xcode工程,single View就行,项目语言选择swift,oc的直接生成就行不用这么麻烦。 2、把跟目录上创建 node的package.json,执行命令 npm init npm install react-nati…

第二阶段站立会议08

站立会议内容: 大家准备继续将代码进行融合,进行测试对一些功能进行优化。 1、会议照片: 2、任务展板: 3、燃尽图: 转载于:https://www.cnblogs.com/smcoder/p/7002539.html

git——学习笔记(三)分支管理

一、创建、合并分支 每次提交,git都往后走一格,串成一跳时间线,head指向的是分支,分支指向提交。master是主分支,dev是另一条分支,分支就像指针一样,合并、删除分支时,修改的都是指针…

阿里巴巴是如何打通 CMDB,实现就近访问的?

CMDB在企业中,一般用于存放与机器设备、应用、服务等相关的元数据。当企业的机器及应用达到一定规模后就需要这样一个系统来存储和管理它们的元数据。有一些广泛使用的属性,例如机器的IP、主机名、机房、应用、region等,这些数据一般会在机器…

[原创]K8_C段旁注工具6.0 新增SMB漏洞扫描

工具: K8_C段旁注工具6.0_0510[K.8]编译: 自己查壳组织: K8搞基大队[K8team]作者: K8拉登哥哥博客: http://qqhack8.blog.163.com发布: 2017/5/24 13:25:54简介: 图片: 功能: 更新历史:6.0 20170510[] C段SMB漏洞扫描(探测系统版本)[] 批量操作-文本比较提取新增内容[] 旁注查…

【公告】社区周刊即日起停刊

各位订阅51CTO社区周刊的小伙伴们,大家好,我是51CTO社区的大管家蘑菇,今天来是想跟大家说,本期周刊将是我们最后一期邮件期刊,没错,是最后一期(请珍惜它~)。或许你会问,停…

springcloud-zuul路由网关

路由网关(zuul) 在微服务架构中,需要多个基础的服务治理组件,包括服务注册与发现、服务消费、负载均衡、断路器、智能 路由、配置管理等,由这个基础组件相互协作,共同组建了一个简单的微服务系统。一个简单的微服务系统如下 图 总…

python DB.fetchall()--获取数据库所有记录列表

查询到的数据格式为列表: 多个元素的列表: 单个元素的列表: 转载于:https://www.cnblogs.com/apple2016/p/5734161.html

Laravel Composer 命令大全

2019独角兽企业重金招聘Python工程师标准>>> ​​​​​​​1、安装 Laravel composer create-project --prefer-dist laravel/laravel 5.xx user-project 2、.env 文件操作 生成 APP_KEY:php artisan key:generate 缓存 .env 配置&#xff…

linux中initrd的含义,Linux2.6 内核的 Initrd 机制解析

1.什么是 Initrdinitrd 的英文含义是 boot loaderinitialized RAM disk,就是由 boot loader 初始化的内存盘。在 linux内核启动前, boot loader 会将存储介质中的 initrd 文件加载到内存,内核启动时会在访问真正的根文件系统前先访…

VBS基础篇 - 常量

VBS基础篇 - 常量 常量:指的是在程序运行过程中其值保持不变的量,它用来保存固定不变的数值,字符串等常数 。 常量的定义:在vbscript中使用使用 Const 指令可以创建名称具有一定含义的字符串型或数值型常量,并给它们赋…

利用深度学习来预测股票价格变动

https://www.toutiao.com/i6644852565341110791/ 利用深度学习来预测股票价格变动(长文,建议收藏) 原创 不靠谱的猫 2019-01-10 21:01:39完整架构概述 在这篇文章中,我将创建一个预测股票价格变动的完整过程。我们将使用生成对抗网…

shell 本地接口自动化

一.基于http/https的接口 一般情况下,当前大多公司在做接口自动化的时候都会使用一些工具;比如:postman/jmeter/python自研开发接口平台。。。 以上的情况,都是在源码与测试使用分离的情况下实践的。也就是说:目前国内…

第50次二级c语言真题,2006年4月全国计算机等级考试二级C语言笔试试卷含答案

一、选择题((1)一(10)每题2分,(11)一(50)每题1分,共60分)下列各题A)、B)、C)、D)四个选项中,只有一个选项是正确的,请将正确选项涂写在答题卡相应位置上,答在试卷上不得分。(1)下列选项中不属于结构化程序设计方法的是…

python hashlib模块

摘要算法简介 Python的hashlib提供了常见的摘要算法,如MD5,SHA1等等。 什么是摘要算法呢?摘要算法又称哈希算法、散列算法。它通过一个函数,把任意长度的数据转换为一个长度固定的数据串(通常用16进制的字符串表示&…