文章目录

1. ZooKeeper 基本概率1.1 服务架构1.2 分层命名空间1.3 简单的 API

2. 结点状态信息 Stat3. watcher 机制3.1 一次性监听3.2 长久监听

4. Session 会话机制5. ZooKeeper 权限控制 (ACL)5.1 Scheme 权限模式5.2 Id 授权对象5.3 Permission 权限类型

6. 自己实现配置中心6.1 读取本地自定义配置6.2 监听 ZooKeeper 动态加载配置

7. Curator 框架7.1 Curator 实现分布式锁7.2 Curator 实现 Leader 选举

1. ZooKeeper 基本概率

Apache ZooKeeper 提供高可靠的分布式协调服务, 用于维护配置信息, 命名, 提供分布式同步和提供组服务

官网地址: https://zookeeper.apache.org

1.1 服务架构

1.2 分层命名空间

ZooKeeper 提供的命名空间很像标准文件系统, 名称是由 / 和结点名称 组成路径信息

1.3 简单的 API

ZooKeeper 提供非常简单的编程接口

create 在树中的某个位置创建一个节点delete 删除一个节点exists 测试节点是否存在于某个位置get data 从节点读取数据set data 将数据写入节点get children 检索节点的子节点列表sync 等待数据传播

2. 结点状态信息 Stat

通过 get 命令可以获取结点状态的详细信息

ZooKeeper 为数据节点增加三个版本信息, 对数据节点任何更新操作都会引起版本号的变化, 就是用来实现乐观锁机制的“写入校验”

3. watcher 机制

ZooKeeper 提供了分布式数据的发布/订阅功能, ZooKeeper 允许客户端向服务端注册一个 watcher 监听, 当服务端的一些指定事件触发了 watcher, 那么服务端就会向客户端发送一个事件通知

对指定节点设置监听的命令

3.1 一次性监听

监听指定 path 节点的修改和删除事件, 一次性触发

get [-s] [-w] path

stat [-w] path

# 例如

get -w /node

# 在其他窗口执行下面命令,会触发相关事件

set /node 123

delete /node

监控指定 path 的子节点的添加和删除事件

`ls [-s] [-w] [-R] path

# 例如

ls -w /node

# 在其他窗口执行下面命令,会触发相关事件

create /node/node1

delete /node/node1

3.2 长久监听

addWatch [-m mode] path

mode 支持两种模式

RSISTENT, 持久化订阅, 针对当前节点的修改和删除事件, 以及当前节点的子节点的删除和新增事件 RSISTENT_RECURSIVE(默认实现), 持久化递归订阅, 在 PERSISTENT 的基础上, 增加了子节点修改的事件触发, 以及子节点的子节点的数据变化都会触发相关事件 (满足递归订阅特性)

4. Session 会话机制

客户端向 ZooKeeper Server 发起连接请求, 此时状态为 CONNECTING当连接建立好之后, Session 状态转化为 CONNECTED, 此时可以进行数据的 IO 操作如果 Client 和 Server 的连接出现丢失, 则 Client 又会变成 CONNECTING 状态如果会话过期或者主动关闭连接时, 此时连接状态为 CLOSE如果是身份验证失败, 直接结束

5. ZooKeeper 权限控制 (ACL)

提供了一套 ACL 权限控制机制来保证数据的安全

使用 scheme : id : permission 来标识

scheme 权限模式, 标识授权策略id 授权对象permission 授予的权限

ZooKeeper 的权限控制是基于每个 znode 节点的, 需要对每个节点设置权限, 每个 znode 支持设置多种权限控制方案和多个权限, 子节点不会继承父节点的权限, 客户端无权访问某节点, 但可能可以访问它的子节点

5.1 Scheme 权限模式

world: 默认方式, 相当于全部都能访问auth: 代表已经认证通过的用户 (cli 中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户)digest: 即用户名: 密码这种方式认证, 这也是业务系统中最常用的. 用 username:password 字符串来产生一个 MD5 串, 然后该串被用来作为 ACL ID. 认证是通过明文发送 username:password 来进行的, 当用在 ACL 时, 表达式为 username:base64, base64 是 password 的 SHA1 摘要的编码ip: 通过 ip 地址来做权限控制, 比如 ip:192.168.1.1 表示权限控制都是针对这个 ip 地址的. 也可以针对网段 ip:192.168.1.1/24, 此时 addr 中的有效位与客户端 addr 中的有效位进行比对

5.2 Id 授权对象

指权限赋予的用户或一个指定的实体, 不同的权限模式下, 授权对象不同

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1ANUqfpV-1652356235099)(en-resource://database/2193:1)]

Id ipId1 = new Id("ip", "192.168.190.1");

Id ANYONE_ID_UNSAFE = new Id("world", "anyone");

5.3 Permission 权限类型

指通过权限检查后可以被允许的操作

Create 允许对子节点 Create 操作Read 允许对本节点 Get Children 和 Get Data 操作Write 允许对本节点 Set Data 操作Delete 允许对子节点 Delete 操作Admin 允许对本节点 setAcl 操作

权限模式 (Schema) 和授权对象 (Id) 主要用来确认权限验证过程中使用的验证策略: 比如 ip 地址, digest:username:password, 匹配到验证策略并验证成功后, 再根据权限操作类型来决定当前客户端的访问权限

6. 自己实现配置中心

6.1 读取本地自定义配置

通过 CustomEnvironmentPostProcessor 类实现 Spring 的 EnvironmentPostProcessor 接口在 META-INF/spring.factories 中配置接口和实现类的全限定名信息加载 custom.properties 中的配置信息可以在 Environment 中查询到用户自定义的配置信息

EnvironmentPostProcessor 的实现类

public class CustomEnvironmentPostProcessor implements EnvironmentPostProcessor {

private static final String PROPERTIES_RESOURCE_LOCATION = "custom.properties";

private final Properties properties = new Properties();

@Override

public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {

Resource resource = new ClassPathResource(PROPERTIES_RESOURCE_LOCATION);

environment.getPropertySources().addLast(loadProperties(resource));

}

private PropertySource loadProperties(Resource resource) {

try {

properties.load(resource.getInputStream());

return new PropertiesPropertySource(resource.getFilename(), properties);

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

META-INF 下的 spring.factories 文件

org.springframework.boot.env.EnvironmentPostProcessor=com.example.springbootZooKeeper.CustomEnvironmentPostProcessor

custom.properties 文件

test.info=Hello SPI

使用自定义配置文件的业务代码

@RestController

public class ConfigController {

@Autowired

Environment environment;

@GetMapping("/env")

public String env(){

return environment.getProperty("test.info");

}

}

6.2 监听 ZooKeeper 动态加载配置

7. Curator 框架

Curator 是 ZooKeeper 的客户端, 对其 api 的封装以及扩展

主要特性

Elections (选举)Locks (分布式锁)

Shared Reentrant Lock: 全局同步的完全分布式锁, 这意味着在任何时间快照中, 没有两个客户端认为他们持有相同的锁Shared Lock: 类似于 Shared Reentrant Lock 不可重入Shared Reentrant Read Write Lock: 跨 JVM 工作的可重入读/写互斥锁Shared Semaphore: 跨 JVM 实现 SemaphoreMulti Shared Lock: 将多个锁作为单个实体进行管理的容器. 当调用 acquire() 时, 将获取所有锁. 如果失败, 则释放所有获取的路径. 同样, 当调用 release() 时, 所有锁都被释放 (忽略失败)

Barriers (屏障)

Barrier: 分布式系统使用 Barrier 来阻塞一组节点的处理, 直到满足一个条件, 此时所有节点都可以继续进行Double Barrier: 双屏障使客户端能够同步计算的开始和结束. 当足够多的进程加入 barrier 时, 进程开始计算, 并在计算完成后离开 barrier

Counters (计数器)

Shared Counter: 管理共享整数Distributed Atomic Long: 分布式原子自增的计数器

Caches (缓存)Nodes/Watches (结点/观察者)Queues (队列)

7.1 Curator 实现分布式锁

利用 ZooKeeper 同级节点的唯一性, 多个进程往 ZooKeeper 的指定节点下创建一个相同名称的节点, 只有一个能成功, 其他是创建失败 创建失败的节点全部通过 ZooKeeper 的 watcher 机制来监听这个节点的变化, 一旦监听到子节点的删除事件, 则再次触发所有进程去写锁

如果遇到大批量访问场景会对性能要求高, 可以通过有序节点来实现分布式锁, 客户端都往指定的节点下注册一个临时有序节点, 越早创建的节点, 节点的顺序编号就越小, 将子节点中最小的节点设置为获得锁

如果自己的节点不是最小的, 每个节点只需要监听比自己小的节点, 当比自己小的节点删除以后, 客户端会收到 watcher 事件, 此时再次判断自己的节点是不是所有子节点中最小的, 如果是则获得锁

代码样例

// 创建锁对象

InterProcessMutex lock = new InterProcessMutex(CuratorFramework, NODE_PATH);

// 尝试获取锁

lock.acquire(100, TimeUnit.MILLISECONDS);

System.out.printf("获取锁成功");

// 锁释放

lock.release();

System.out.printf("释放锁成功");

源码解读

public InterProcessMutex(CuratorFramework client, String path){

// ZooKeeper 利用 path 创建临时顺序节点,实现公平锁的核心

this(client, path, new StandardLockInternalsDriver());

}

// 无限等待获取锁

public void acquire()throws Exception {

if ( !internalLock(-1, null)){

throw new IOException("Lost connection while trying to acquire lock: " + basePath);

}

}

// 限时等待获取锁

public boolean acquire(long time, TimeUnit unit)throws Exception {

return internalLock(time, unit);

}

// 锁释放

public void release() throws Exception {

Thread currentThread = Thread.currentThread();

InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);

.....

// 锁重入次数

int newLockCount = lockData.lockCount.decrementAndGet();

.....

try {

this.internals.releaseLock(lockData.lockPath);

} finally {

this.threadData.remove(currentThread);

}

.....

}

7.2 Curator 实现 Leader 选举

Elections 有两种实现

Leader Latch: 核心思想是初始化多个 LeaderLatch, 然后在等待几秒钟后, Curator 会自动从中选举出 Leader, leader 会一直占用领导权Leader Election: 让所有的实例轮流当 Leader, Leader 的实例在释放领导权之后, 该实例还可以再次竞争 Leader, 选举出来的 Leader 实例不会一直占有领导权

LeaderLatch leaderLatch = new LeaderLatch(CuratorFramework, NODE_PATH);

leaderLatch.addListener(xxxxx);

leaderLatch.addListener(xxxxx);

leaderLatch.addListener(xxxxx);

leaderLatch.addListener(xxxxx);

leaderLatch.start();

查看原文