首页 > 网络 > 云计算 >

Zookeeper:Curator框架应用和常用命令

2016-11-01

Zookeeper:Curator框架应用和常用命令。Curator框架提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。

目录

CuratorFramework介绍

scala代码体现

scala代码结果

Linux命令

1)CuratorFramework介绍

Curator框架提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。 这些特性包括:

自动化的连接管理: 重新建立到ZooKeeper的连接和重试机制存在一些潜在的错误case。 Curator帮助你处理这些事情,对你来说是透明的。
清理API:
    简化了原生的ZooKeeper的方法,事件等
    提供了一个现代的流式接口
提供了Recipes实现: 如前面的文章介绍的那样,基于这些Recipes可以创建很多复杂的分布式应用

Curator框架通过CuratorFrameworkFactory以工厂模式和builder模式创建CuratorFramework实例。 CuratorFramework实例都是线程安全的,你应该在你的应用中共享同一个CuratorFramework实例.

工厂方法newClient()提供了一个简单方式创建实例。 而Builder提供了更多的参数控制。一旦你创建了一个CuratorFramework实例,你必须调用它的start()启动,在应用退出时调用close()方法关闭.
CuratorFramework提供的方法:

方法名 描述
create() 开始创建操作, 可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()指定要操作的ZNode
delete() 开始删除操作. 可以调用额外的方法(版本或者后台处理version or background)并在最后调用forPath()指定要操作的ZNode
checkExists() 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode
getData() 开始获得ZNode节点数据的操作. 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode
setData() 开始设置ZNode节点数据的操作. 可以调用额外的方法(版本或者后台处理) 并在最后调用forPath()指定要操作的ZNode
getChildren() 开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode
inTransaction() 开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交

scala代码体现

package com.donews.zk

import java.util.concurrent.TimeUnit
import kafka.common.TopicAndPartition
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._


/**
  * Created by yuhui on 2016/10/31.
  */
object zkOperate{

  val LOG = LoggerFactory.getLogger(zkOperate.getClass)
  val client = {
    val client = CuratorFrameworkFactory
      .builder
      .connectString("slave01:2181,slave02:2181,slave03:2181")
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("yuhui_test")
      .build()
    client.start()
    client
  }

  def lock(path: String)(body: => Unit) {
    val lock = new InterProcessMutex(client, path)
    lock.acquire()
    try {
      body
    } finally {
      lock.release()
    }

  }

  def tryDo(path: String)(body: => Unit): Boolean = {
    val lock = new InterProcessMutex(client, path)
    if (!lock.acquire(10, TimeUnit.SECONDS)) {
      LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出")
      return false
    }
    try {
      LOG.info("获准运行")
      body
      true
    } finally {
      lock.release()
      LOG.info(s"释放锁 {$path}")
    }

  }

  //zookeeper创建路径
  def ensurePathExists(path: String): Unit = {
    if (client.checkExists().forPath(path) == null) {
      client.create().creatingParentsIfNeeded().forPath(path)
    }
  }

  //zookeeper提取offset的方法
  def loadOffsets(topicSet: Set[String], defaultOffset: Map[TopicAndPartition, Long]): Map[TopicAndPartition, Long] = {
    val kafkaOffsetPath = s"/kafkaOffsets"
    ensurePathExists(kafkaOffsetPath)
    val offsets = for {
    //t就是路径webstatistic/kafkaOffsets下面的子目录遍历
      t <- client.getChildren.forPath(kafkaOffsetPath)
      if topicSet.contains(t)
      //p就是新路径   /webstatistic/kafkaOffsets/donews_website
      p <- client.getChildren.forPath(s"$kafkaOffsetPath/$t")
    } yield {
      //遍历路径下面的partition中的offset
      val data = client.getData.forPath(s"$kafkaOffsetPath/$t/$p")
      //将data变成Long类型
      val offset = java.lang.Long.valueOf(new String(data)).toLong
      (TopicAndPartition(t, Integer.parseInt(p)), offset)
    }
    defaultOffset ++ offsets.toMap
  }

  //zookeeper存储offset的方法
  def storeOffsets(offsets: Map[TopicAndPartition, Long]): Unit = {
    val kafkaOffsetPath = s"/kafkaOffsets"
    if (client.checkExists().forPath(kafkaOffsetPath) == null) {
      client.create().creatingParentsIfNeeded().forPath(kafkaOffsetPath)
    }
    for ((tp, offset) <- offsets) {
      val data = String.valueOf(offset).getBytes
      val path = s"$kafkaOffsetPath/${tp.topic}/${tp.partition}"
      ensurePathExists(path)
      client.setData().forPath(path, data)
    }
  }

  def main(args: Array[String]) {

    //获取到namespace
    println(client.getNamespace)

    //创建路径
    val kafkaOffsetPath = "/kafkaOffsets"
    if (client.checkExists().forPath(kafkaOffsetPath) == null) {
      client.create().creatingParentsIfNeeded().forPath(kafkaOffsetPath)
    }

    //删除路径
    client.delete().forPath("/kafkaOffsets/web/1")

    //存储值
    val offsets : Map[TopicAndPartition, Long] = Map(TopicAndPartition("web",1) ->4444, TopicAndPartition("web",2)->2222 )
    storeOffsets(offsets)

    //获取值
    val topicSet = Set("web")
    val offsetstoMap:Map[TopicAndPartition, Long]= loadOffsets(topicSet,Map(TopicAndPartition("web",1) ->0))
    offsetstoMap.keySet.foreach(line=>{
    val  topicName = line.topic
    val  topicPartition = line.partition
    val data = client.getData.forPath(s"$kafkaOffsetPath/$topicName/$topicPartition")
    val offset = java.lang.Long.valueOf(new String(data)).toLong
    println("路径"+s"$kafkaOffsetPath/$topicName/$topicPartition"+"的值为:"+ offset)
    })

  }

}

scala代码结果

这里写图片描述

这里写图片描述

常用命令

ZooKeeper服务命令:
1. 启动ZK服务: ./zkServer.sh start
2. 查看ZK服务状态: ./zkServer.sh status
3. 停止ZK服务: ./zkServer.sh stop
4. 重启ZK服务: ./zkServer.sh restart
zk客户端命令:
ZooKeeper 命令行工具类似于Linux的shell环境,使用它可以对ZooKeeper进行访问,数据创建,数据修改等操作. 使用 zkCli.sh -server 127.0.0.1:2181 连接到 ZooKeeper 服务,连接成功后,系统会输出 ZooKeeper 的相关环境以及配置信息。
命令行工具的一些简单操作如下:
1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据
3. 创建文件,并设置初始内容: create /zk “test” 创建一个新的 znode节点“ zk ”以及与它关联的字符串
4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
5. 修改文件内容: set /zk “zkbak” 对 zk 所关联的字符串进行设置
6. 删除文件: delete /zk 将刚才创建的 znode 删除
7. 退出客户端: quit
8. 帮助命令: help
ZooKeeper 常用四字命令:
ZooKeeper 支持某些特定的四字命令字母与其的交互。它们大多是查询命令,用来获取 ZooKeeper 服务的当前状态及相关信息。用户在客户端可以通过 telnet 或 nc 向 ZooKeeper 提交相应的命令
1. 可以通过命令:echo stat|nc 127.0.0.1 2181 来查看哪个节点被选择作为follower或者leader
2. 使用echo ruok|nc 127.0.0.1 2181 测试是否启动了该Server,若回复imok表示已经启动。
3. echo dump| nc 127.0.0.1 2181 ,列出未经处理的会话和临时节点。
4. echo kill | nc 127.0.0.1 2181 ,关掉server
5. echo conf | nc 127.0.0.1 2181 ,输出相关服务配置的详细信息。
6. echo cons | nc 127.0.0.1 2181 ,列出所有连接到服务器的客户端的完全的连接 / 会话的详细信息。
7. echo envi |nc 127.0.0.1 2181 ,输出关于服务环境的详细信息(区别于 conf 命令)。
8. echo reqs | nc 127.0.0.1 2181 ,列出未经处理的请求。
9. echo wchs | nc 127.0.0.1 2181 ,列出服务器 watch 的详细信息。
10. echo wchc | nc 127.0.0.1 2181 ,通过 session 列出服务器 watch 的详细信息,它的输出是一个与 watch 相关的会话的列表。
11. echo wchp | nc 127.0.0.1 2181 ,通过路径列出服务器 watch 的详细信息。它输出一个与 session 相关的路径。

相关文章
最新文章
热点推荐