什么是Zookeeper
官方文档上这么解释zookeeper,它是一个分布式服务框架,是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
上面的解释有点抽象,简单来说zookeeper=文件系统+监听通知机制。
Zookeeper应用场景
- 注册中心(Dubbo+Zookeeper)
- 分布式配置中心(统一存放配置文件)
- 分布式锁
- 分布式队列
- 分布式文件系统
zookeeper相关特性
1、高效
适用于大型的分布式系统. 如果写多的话性能不高,因为它要做所有节点之间的数据同步。
2、可靠
支持集群,大部分可用即服务可用
3、顺序
所有写请求由leader生成递增zxid,写操作时,采用mvcc乐观锁机制进行写,保证所有写操作顺序。
4、简洁
对外提供的api非常实用、简洁。仅仅7个api
create - 在树形结构的位置中创建节点
delete - 删除一个节点
exists - 测试节点在指定位置上是否存在
get data - 从节点上读取数据
set data - 往节点写入输入
get chilren - 检索一个节点的子节点列表
sync - 等待传输数据
Zookeeper数据模型
- 临时节点
- 临时顺序节点
- 持久节点
- 持久顺序节点
ACL权限控制
ACL权限模型,实际上就是对树每个节点实现控制
身份的认证有4种方式:
- world:默认方式,相当于全世界都能访问
- auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
- digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
- ip:使用Ip地址认证
Zookeeper单机版本
Linux环境安装Zookeeper(单机)
1.解压zk压缩包
https://logaaaaa.oss-cn-beijing.aliyuncs.com/zookeeper-3.4.14.tar.gz
tar -zxvf zookeeper-3.4.14.tar.gz
- 进入到zk目录
cd zookeeper-3.4.14
3.在zk目录中创建data和logs文件夹
mkdir data
mkdir logs
4.进入到conf目录,修改文件名称
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
dataDir =/usr/local/zookeeper-3.4.14/data
dataLogDir=/usr/local/zookeeper-3.4.14/logs
5.启动zk
./zkServer.sh start
./zkServer.sh status
Java语言操作ZK
依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.7</version>
</dependency>
/**
* 连接地址
*/
private static final String Address="192.168.31.185:2181";
/**
* 超时时间
*/
private static final int TIMEOUT_PARAM=5000;
/**
* 事件通知
*/
private static final String WATCHER="";
/**
* 计数器
*/
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper = new ZooKeeper(Address, TIMEOUT_PARAM, new Watcher() {
//获取到该链接是否成功
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
if (state == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
}
}
});
System.out.println("zk正在连接等待");
countDownLatch.await();
System.out.println("开始创建节点");
/**
* 创建节点
* 1。路径名称
* 2。节点value
* 3。节点权限
* 4。节点类型 4种类型 临时节点,持久化节点。临时有序号节点。持久有序号节点。
*/
if(zooKeeper.exists("/test1", false) == null) {
String s = zooKeeper.create("/test1", "gt1f".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(s);
}
zooKeeper.close();
}
创建账号以及设置连接账户
// 创建账号权限 admin可以实现读写操作
Id id1 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123"));
ACL acl1 = new ACL(ZooDefs.Perms.ALL, id1);
// 设置zk连接账号
zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes());
实现服务注册与发现
服务注册端
@Component
public class ApplicationRunnerImpl implements ApplicationRunner {
@Value("${server.port}")
private String serverPort;
@Override
public void run(ApplicationArguments args) throws Exception {
start();
}
public void start() throws IOException, KeeperException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper("192.168.31.185", 50000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getState().name());
}
});
String parentPath = "/gtf";
Stat exists = zooKeeper.exists(parentPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
// 如果当前父节点不存在的情况 就创建
if (exists == null) {
zooKeeper.create(parentPath, "gtf".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 将当前服务信息注册到zk上
String data = "http://127.0.0.1:" + serverPort;
zooKeeper.create(parentPath + "/" + serverPort, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
客户端
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper = new ZooKeeper("192.168.31。185", 50000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
// 如果当前连接成功,则开始放心
if (state == Event.KeeperState.SyncConnected) {
System.out.println("zk连接成功~~");
countDownLatch.countDown();
}
}
});
countDownLatch.await();
String path = "/gtf";
// 获取该节点下子集
List<String> children = zooKeeper.getChildren(path, null, new Stat());
for (int i = 0; i < children.size(); i++) {
String pathChildren = path + "/" + children.get(i);
byte[] data = zooKeeper.getData(pathChildren, null, new Stat());
System.out.println("服务接口地址:" + new String(data));
}
}
Linux环境安装Zookeeper(集群)
原理:投票过半机制 zk选举策略领导就采用过半机制 两阶段提交协议
注意:首先需要明确zookeeper选举的规则:leader选举,要求 可用节点数量 > 总节点数量/2 。注意 是 > , 不是 ≥。
进入conf目录,修改 zoo.cfg
server.1=192.168.31.185:2888:3888
server.2=192.168.31.186:2888:3888
server.3=192.168.31.233:2888:3888
删除data/下的version-2 zookeeper_server.pid
在data目录,新建 myid 内容为 1 2 3 这数值不允许重复
Zookeeper一致性原理
强制一致性:数据一修改数据立马发生变更
弱一致性:数据一修改,在网络同步。允许数据读取之前的数据
最终一致性:数据一修改,允许数据有短暂延迟。
Zookeeper如何解决分布式一致性问题 ZAB协议底层两阶段提交协议
Zk集群是由多个Server节点组成了一个集群,只有一个Leader节点;其他节点类型都是为Follower类型。
Zk中分为三种节点:
- Leader类型 领导类型 负责写的请求,和各个节点同步;
- Follower类型 跟随者 负责读的请求和投票决议
- ObServer类型 观察者 和Follower大部分特征都是一样的,唯一区别就是不能参与选举和投票
为什么要使用ObServer类型,主要不影响原来本身选举的时间的效率、目的是提高客户端查询效率;
zk数据同步
Zk集群是由多个Server节点组成了一个集群,只有一个Leader节点;其他节点类型都是为Follower类型。
- 每个Follower节点保存了Leader节点副本数据;
- 全局保证数据一致性问题
- 分布式读写分开 写的请求统一交给Leader实现,Follower或者是ObServer节点主要实现读的操作;
注意:如果我们连接的节点类型为ObServer或者Follower情况下做写的操作的时候直接转发到Leader实现写
ZAB原子广播协议 核心是保证各个节点数据同步问题,ZAB协议中两种模式
(恢复模式、广播模式)
恢复模式:选举新的Leader
广播模式:解决每个节点数据同步问题
zk与eureka区别
网络分区脑裂
在集群的情况下,一般只会有选举一个master节点、其他节点都是为从接地安慰你,那么如果网络发生抖动或者部分节点无法实现通讯 那么就会导致部分节点从新实现选举,这样就会存在多个master节点。
相同点:Eureka和Zookeeper都是可以实现服务注册中心。
不同点:
Zookeeper保证CP数据一致性问题,(原理ZAB原子广播协议),当zk在某种情况下出现了宕机,会重新实现对zk选举新的领导(恢复机制),如果zk选举的新的领导时间过长的话,
或者投票没有过半数 ,那么会导致整个zk集群环境不可用,这也以为者服务注册中心不可用,所以zk必须保证数据一致性;
Eureka保证ap,设计思想有限考虑可用性、完全去中心化服务注册中心,每个节点都是均等的;Eureka集群没有主从之分,几个节点挂掉也不会影响到整个Eureka的使用,Eureka客户端发现连接时的可用的话,自动切换到下一个eureka连接,只要保证eureka有一个节点存在的话,就可以保证整个服务注册中心使用。
为什么SpringCloud选择Eureka作为注册中心而不是Zookeeper呢?
首先在这时候我们明白一点:
服务注册中心,可以短暂读取以前服务注册列表信息,但是不可用接受节点宕机不可用。
CAP概念
① C:Consistency,一致性,数据一致更新,所有数据变动都是同步的。
② A:Availability,可用性,系统具有好的响应性能。
③ P:Partition tolerance,分区容错性。以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择,也就是说无论任何消息丢失,系统都可用。
目前我们当前技术环境下,不能同时满足CA,但是可以满足CP或者AP
Zookeeper实现分布式锁
Zookeeper事件通知
pom
<!-- java语言连接zk -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
/**
* 连接地址
*/
private static final String Address="192.168.31.185:2181";
/**
* 超时时间
*/
private static final int TIMEOUT_PARAM=5000;
/**
* 事件通知
*/
private static final String WATCHER="";
/**
* 计数器
*/
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) {
//创建zookeeper连接
ZkClient zkClient = new ZkClient(Address, TIMEOUT_PARAM);
String parentPath="/gtf-service";
//开始监听子节点发生的变化
zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
System.out.println("s:"+s +",节点发生了变化");
list.forEach((t)->{
System.out.println(t);
});
}
});
//监听节点的value数值是否发生变化
zkClient.subscribeDataChanges(parentPath, new IZkDataListener() {
//节点内容是否发生变化
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("s:"+s+"0:"+o);
}
//监听该节点是否可以删除
@Override
public void handleDataDeleted(String s) throws Exception {
System.err.println("s删除:"+s);
}
});
// zkClient.writeData(parentPath,"gtf-servicexiugaui");
zkClient.delete(parentPath+"/zjiedian");
while (true){
}
// zkClient.close();
}
Zookeeper分布式锁实现
Zookeeper实现分布式锁的思路:
节点保证唯一、事件通知、临时节点(生命周期和Session会关联)
创建分布式锁原理:
- 多个jvm同时在Zookeeper上创建相同的临时节点(lockPath)
- 因为临时节点路径保证唯一的性,只要谁能够创建成功谁就能够获取锁,就可以开始执行业务逻辑;
- 如果节点已经给其他请求创建的话或者是创建节点失败,当前的请求实现等待;
释放锁的原理:
.因为我们采用临时节点,当前节点创建成功,表示获取锁成功;正常执行完业务逻辑调用Session关闭连接方法,当前的节点会删除;----释放锁
其他正在等待请求,采用事件监听如果当前节点被删除的话,有重新进入到获取锁流程;
临时节点+事件通知
使用模版方式进行分布式锁
创建一个接口
/**
*
*/
public interface Lock {
/**
* 获取锁
*/
void getLock();
/**
* 释放锁
*
*/
void unlock();
}
创建一个模版
public abstract class AbstractTemplateLock implements Lock{
@Override
public void getLock() {
//定义一个共同抽象的骨架。
if (tryLock()){
System.out.println("》》》"+Thread.currentThread().getName() + "获取锁成功");
}else {
//获取锁失败 ,开始等待。
waitLock();//时间监听
getLock();
}
}
protected abstract void waitLock() ;
protected abstract boolean tryLock() ;
protected abstract boolean unLockImpl() ;
@Override
public void unlock() {
//关闭zk连接
unLockImpl();
}
}
创建一个实现
public class ZkTemplzateLock extends AbstractTemplateLock {
/**
* 连接地址
*/
private static final String Address="192.168.31.185:2181";
/**
* 超时时间
*/
private static final int TIMEOUT_PARAM=5000;
/*
创建zk连接
*/
private ZkClient zkClient = new ZkClient(Address, TIMEOUT_PARAM);
/**
* 共同的创建锁 名称
*/
private String localPath="/lockgtf";
private CountDownLatch countDownLatch=null;
@Override
protected void waitLock() {
//事件监听,监听节点是否被删除。
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
};
zkClient.subscribeDataChanges(localPath, iZkDataListener);
if (countDownLatch == null) {
countDownLatch=new CountDownLatch(1);
}
try {
countDownLatch.await();//如果当前计数器不为0,一直等待。
} catch (InterruptedException e) {
e.printStackTrace();
}
//1。如果当前节点被删除的节点下,开始重新获取锁。
zkClient.unsubscribeDataChanges(localPath,iZkDataListener);
}
@Override
protected boolean tryLock() {
//获取锁的思想,多个节点同时创建,只要有一个创建成功 就是创建成功
try {
zkClient.createEphemeral(localPath);
return true;
}catch (Exception e) {
//如果创建节点已经存在 就会异常。
return false;
}
}
@Override
protected boolean unLockImpl() {
if (zkClient!=null) {
zkClient.close();
System.out.println(Thread.currentThread().getName() + "释放锁成功");
return true;
}
return false;
}
}