【README】
java本地访问 zk cluster, refer 2 https://blog.csdn.net/PacosonSWJTU/article/details/111404364
【1】 客户端监听 zk节点变化
1) zk客户端代码——http访问的客户端代码(请求zk获取 http服务器的上下线信息)
(对于zk来说,http的server或client都是客户端,这里要能够理解,本文只是将htpt作为例子以便理解,当然也可以是其他请求协议)
/*** 分布式客户端 */
public class DistributeClient {public static void main(String[] args) throws Exception {DistributeClient client = new DistributeClient();/*1-获取zk连接 */client.getZkConn(); /*2-注册监听节点*/ client.registerListener();/*3-业务逻辑加工*/client.doBusi();}/*** zk客户端*/private ZooKeeper zkClient;/*** 获取子节点 * @throws KeeperException* @throws InterruptedException */private void registerListener() throws KeeperException, InterruptedException {/* 监听servers路径 */List<String> children = zkClient.getChildren("/servers", true);/* 存储服务器节点主机名称集合 */ArrayList<String> hosts = new ArrayList<>(); for(String child : children) {byte[] data = zkClient.getData("/servers/" + child, false, null);hosts.add(new String(data)); } /* 将所有主机名称打印到控制台 */System.out.println(hosts); }private void doBusi() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); // 进程阻塞 }/*** zk server 连接串 */private final static String connectString = "192.168.163.201:2181,192.168.163.202:2181,192.168.163.203:2181";/*** 超时时间*/private final static int sessionTimeout = 3000; /*** 0-获取zk连接 */public ZooKeeper getZkConn() throws IOException {/* 连接zk服务器 */zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {try {registerListener();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});return zkClient; }
}
2) centos8的zk节点操作
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/server "hadoo1202"
Created /servers/server0000000002
[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/server "hadoop1203"
Created /servers/server0000000003
[zk: localhost:2181(CONNECTED) 3] create -e -s /servers/server "hadoop1204"
Created /servers/server0000000004
[zk: localhost:2181(CONNECTED) 4] quit
Quitting...
2020-12-20 00:25:34,695 [myid:] - INFO [main:ZooKeeper@684] - Session: 0x1767ab9b8f10004 closed
2020-12-20 00:25:34,698 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@519] - EventThread shut down for session: 0x1767ab9b8f10004
[root@localhost zookeeper-3.4.10]#
3)java日志
[hadoo1202]
[hadoo1202]
[hadoop1203, hadoo1202]
[hadoop1204, hadoop1203, hadoo1202]
[]
【2】zk客户端代码——http访问的服务端代码(向zk写入 http服务器上线信息)
/*** 分布式服务器 */
public class DistributeServer {public static void main(String[] args) throws Exception {DistributeServer server = new DistributeServer();/*1-获取zk连接 */server.getZkConn(); /*2-注册监听节点*/server.registerServer(args[0]); // 传入服务器名称,如 hadoop102 /*3-业务逻辑加工*/server.doBusi();}/*** zk客户端*/private ZooKeeper zkClient; /*** 注册服务器 * @param hostname*/private void registerServer(String hostname) throws KeeperException, InterruptedException {String path = zkClient.create("/servers/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + "is online"); System.out.printf("path = %s /n", path); } private void doBusi() throws InterruptedException {Thread.sleep(Long.MAX_VALUE); // 进程阻塞 }/*** zk server 连接串 */private final static String connectString = "192.168.163.201:2181,192.168.163.202:2181,192.168.163.203:2181";/*** 超时时间*/private final static int sessionTimeout = 3000; /*** 0-获取zk连接 */public ZooKeeper getZkConn() throws IOException {/* 连接zk服务器 */zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {
// try {
// /*3-获取子节点并监控节点变化*/
// System.out.println("-------watcher start---------");
// zkClient.getChildren("/", true).stream().forEach(System.out::println);
// System.out.println("-------watcher end ---------");
// } catch (KeeperException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }}});return zkClient; }
}
step1) 启动 DistributeServer,参数为 hadoop102
-- log
hadoop102 is online
path = /servers/server0000000005
step2) 查看 DistributeClient 控制台
-- log
[hadoop102]
step3) 启动 DistributeServer,参数为 hadoop103
-- loghadoop103 is online
path = /servers/server0000000006
step4) 查看 DistributeClient 控制台
-- log
[hadoop103, hadoop102]
step5)关闭 DistributeServer 进程(参数为 hadoop103 的进程)
查看 DistributeClient 控制台
-- log
[hadoop102]
step6)关闭 DistributeServer 进程(参数为 hadoop102 的进程)
查看 DistributeClient 控制台
-- log
[]