Zookeeper 简明教程

Zookeeper - API

ZooKeeper 有一个官方的 Java 和 C API 绑定。ZooKeeper 社区为大多数语言(.NET、python 等)提供非官方 API。使用 ZooKeeper API,应用程序可以连接、交互、操作数据、协调,最后从 ZooKeeper 集群断开连接。

ZooKeeper API 具有丰富的一组功能,可以以一种简单安全的方式获得 ZooKeeper 集群的所有功能。ZooKeeper API 提供同步和异步方法。

ZooKeeper 集群和 ZooKeeper API 在各个方面都完全相辅相成,极大地有益于开发人员。让我们在本章中讨论 Java 绑定。

Basics of ZooKeeper API

与 ZooKeeper 集群互动的应用程序称为 ZooKeeper Client 或简称 Client

节点是 ZooKeeper 集群的核心组件,而 ZooKeeper API 提供了一小套方法来使用 ZooKeeper 集群操作所有节点的详细信息。

要与 ZooKeeper 集合进行清晰简洁的交互,客户端应遵循以下步骤。

  1. 连接到 ZooKeeper 集合。ZooKeeper 集合为客户端分配一个会话 ID。

  2. 定期向服务器发送心跳。否则,ZooKeeper 集合会使会话 ID 过期,而客户端需要重新连接。

  3. 只要会话 ID 处于活动状态,即可获取/设置 znode。

  4. 完成所有任务后,断开与 ZooKeeper 集合的连接。如果客户端长时间不活动,则 ZooKeeper 集合将自动断开客户端连接。

Java Binding

让我们在本节中了解最重要的 ZooKeeper API 集合。ZooKeeper API 的核心部分是 ZooKeeper class 。它提供选项,在其构造函数中连接 ZooKeeper 集合,并具有以下方法:

  1. connect - 连接到 ZooKeeper 集合

  2. create - 创建 znode

  3. exists - 检查 znode 是否存在及其信息

  4. getData - 从特定 znode 获取数据

  5. setData - 在特定 znode 中设置数据

  6. getChildren - 获取特定 znode 中可用的所有子节点

  7. delete - 获取特定 znode 及其所有子项

  8. close - 关闭连接

Connect to the ZooKeeper Ensemble

ZooKeeper 类通过其构造函数提供连接功能。构造函数的签名如下所示:

ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)

其中,

  1. connectionString - ZooKeeper 集合主机。

  2. sessionTimeout - 会话超时(以毫秒为单位)。

  3. watcher - 一个实现“观察者”接口的对象。ZooKeeper 集合通过观察者对象返回连接状态。

让我们创建一个新的帮助程序类 ZooKeeperConnection 并添加一个方法 connectconnect 方法创建 ZooKeeper 对象,连接到 ZooKeeper 集合,然后返回对象。

此处 CountDownLatch 用来停止(等待)主进程,直到客户端连接到 ZooKeeper 集群。

ZooKeeper 集群通过 Watcher callback 返回连接状态。客户端连接到 ZooKeeper 集群后,将调用 Watcher 回调,并且 Watcher 回调会调用 CountDownLatchcountDown 方法来释放 await 中主进程的锁。

以下是用于连接到 ZooKeeper 集群的完整代码。

Coding: ZooKeeperConnection.java

// import java classes
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

// import zookeeper classes
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;

public class ZooKeeperConnection {

   // declare zookeeper instance to access ZooKeeper ensemble
   private ZooKeeper zoo;
   final CountDownLatch connectedSignal = new CountDownLatch(1);

   // Method to connect zookeeper ensemble.
   public ZooKeeper connect(String host) throws IOException,InterruptedException {

      zoo = new ZooKeeper(host,5000,new Watcher() {

         public void process(WatchedEvent we) {

            if (we.getState() == KeeperState.SyncConnected) {
               connectedSignal.countDown();
            }
         }
      });

      connectedSignal.await();
      return zoo;
   }

   // Method to disconnect from zookeeper server
   public void close() throws InterruptedException {
      zoo.close();
   }
}

保存以上代码,它将在下一节中用于连接到 ZooKeeper 集群。

Create a Znode

ZooKeeper 类提供了 create method 用于在 ZooKeeper 集群中创建新 znode。 create 方法的签名如下 −

create(String path, byte[] data, List<ACL> acl, CreateMode createMode)

其中,

  1. path − Znode 路径。例如,/myapp1、/myapp2、/myapp1/mydata1、myapp2/mydata1/myanothersubdata

  2. data − 要存储在指定 znode 路径中的数据

  3. acl − 要创建的节点的访问控制列表。ZooKeeper API 提供了一个静态接口 ZooDefs.Ids 来获取一些基本 acl 列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回一个对开放式 znode 的 acl 列表。

  4. createMode − 节点的类型,可以是临时、顺序或同时是这两种类型。这是一个 enum

让我们创建一个新的 Java 应用程序来检查 ZooKeeper API 的 create 功能。创建一个文件 ZKCreate.java 。在主方法中,创建一个类型为 ZooKeeperConnection 的对象并调用 connect 方法来连接到 ZooKeeper 集群。

connect 方法将返回 ZooKeeper 对象 zk 。现在,使用自定义 pathdata 调用 zk 对象的 create 方法。

用于创建 znode 的完整程序代码如下 −

Coding: ZKCreate.java

import java.io.IOException;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

public class ZKCreate {
   // create static instance for zookeeper class.
   private static ZooKeeper zk;

   // create static instance for ZooKeeperConnection class.
   private static ZooKeeperConnection conn;

   // Method to create znode in zookeeper ensemble
   public static void create(String path, byte[] data) throws
      KeeperException,InterruptedException {
      zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
      CreateMode.PERSISTENT);
   }

   public static void main(String[] args) {

      // znode path
      String path = "/MyFirstZnode"; // Assign path to znode

      // data in byte array
      byte[] data = "My first zookeeper app”.getBytes(); // Declare data

      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         create(path, data); // Create the data to the specified path
         conn.close();
      } catch (Exception e) {
         System.out.println(e.getMessage()); //Catch error message
      }
   }
}

一旦编译并执行应用程序,将在 ZooKeeper 集群中创建一个具有指定数据的新 znode。你可以使用 ZooKeeper CLI zkCli.sh 来检查它。

cd /path/to/zookeeper
bin/zkCli.sh
>>> get /MyFirstZnode

Exists – Check the Existence of a Znode

ZooKeeper 类提供 exists method 来检查 znode 的存在。如果指定的 znode 存在,它将返回 znode 的元数据。 exists 方法的签名如下 −

exists(String path, boolean watcher)

其中,

  1. path − Znode path

  2. watcher − 布尔值,用于指定是否监视指定的 znode

让我们创建一个新的 Java 应用程序来检查 ZooKeeper API 的“exists”功能。创建一个文件 “ZKExists.java”。在主方法中,使用 “ZooKeeperConnection” 对象创建一个 ZooKeeper 对象“zk”。然后,使用自定义“path”调用“zk”对象的“exists”方法。完整的清单如下 −

Coding: ZKExists.java

import java.io.IOException;

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

public class ZKExists {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to check existence of znode and its status, if znode is available.
   public static Stat znode_exists(String path) throws
      KeeperException,InterruptedException {
      return zk.exists(path, true);
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; // Assign znode to the specified path

      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path); // Stat checks the path of the znode

         if(stat != null) {
            System.out.println("Node exists and the node version is " +
            stat.getVersion());
         } else {
            System.out.println("Node does not exists");
         }

      } catch(Exception e) {
         System.out.println(e.getMessage()); // Catches error messages
      }
   }
}

一旦编译并执行应用程序,你将获得以下输出。

Node exists and the node version is 1.

getData Method

ZooKeeper 类提供了 getData 方法来获取附加在指定 znode 中的数据及其状态。 getData 方法的签名如下 −

getData(String path, Watcher watcher, Stat stat)

其中,

  1. path − Znode path.

  2. watcher − 类型为 Watcher 的回调函数。ZooKeeper 集群将在指定 znode 的数据更改时通过 Watcher 回调通知。这是一次性通知。

  3. stat - 返回 z 节点的元数据。

我们创建一个新的 Java 应用程序以了解 ZooKeeper API 的 getData 功能。创建文件 ZKGetData.java 。在 main 方法中,使用 he ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk 。然后,使用自定义路径调用 zk 对象的 getData 方法。

以下是获取指定节点数据的完整程序代码 -

Coding: ZKGetData.java

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

public class ZKGetData {

   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;
   public static Stat znode_exists(String path) throws
      KeeperException,InterruptedException {
      return zk.exists(path,true);
   }

   public static void main(String[] args) throws InterruptedException, KeeperException {
      String path = "/MyFirstZnode";
      final CountDownLatch connectedSignal = new CountDownLatch(1);

      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path);

         if(stat != null) {
            byte[] b = zk.getData(path, new Watcher() {

               public void process(WatchedEvent we) {

                  if (we.getType() == Event.EventType.None) {
                     switch(we.getState()) {
                        case Expired:
                        connectedSignal.countDown();
                        break;
                     }

                  } else {
                     String path = "/MyFirstZnode";

                     try {
                        byte[] bn = zk.getData(path,
                        false, null);
                        String data = new String(bn,
                        "UTF-8");
                        System.out.println(data);
                        connectedSignal.countDown();

                     } catch(Exception ex) {
                        System.out.println(ex.getMessage());
                     }
                  }
               }
            }, null);

            String data = new String(b, "UTF-8");
            System.out.println(data);
            connectedSignal.await();

         } else {
            System.out.println("Node does not exists");
         }
      } catch(Exception e) {
        System.out.println(e.getMessage());
      }
   }
}

一旦应用程序编译并执行,您会得到以下输出

My first zookeeper app

并且应用程序会等待 ZooKeeper 集成的进一步通知。使用 ZooKeeper CLI zkCli.sh 更改指定的 z 节点的数据。

cd /path/to/zookeeper
bin/zkCli.sh
>>> set /MyFirstZnode Hello

现在,应用程序将打印以下输出并退出。

Hello

setData Method

ZooKeeper 类提供 setData 方法来修改附加在指定的 z 节点中的数据。 setData 方法的签名如下 -

setData(String path, byte[] data, int version)

其中,

  1. path − Znode path

  2. data - 存储在指定 z 节点路径中的数据。

  3. version - z 节点的当前版本。每当数据更改时,ZooKeeper 都会更新 z 节点的版本号。

我们现在创建一个新的 Java 应用程序以了解 ZooKeeper API 的 setData 功能。创建文件 ZKSetData.java 。在 main 方法中,使用 ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk 。然后,使用指定的路径、新数据和节点版本调用 zk 对象的 setData 方法。

以下是修改附加在指定 z 节点中的数据的完整程序代码。

Code: ZKSetData.java

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import java.io.IOException;

public class ZKSetData {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to update the data in a znode. Similar to getData but without watcher.
   public static void update(String path, byte[] data) throws
      KeeperException,InterruptedException {
      zk.setData(path, data, zk.exists(path,true).getVersion());
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path= "/MyFirstZnode";
      byte[] data = "Success".getBytes(); //Assign data which is to be updated.

      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         update(path, data); // Update znode data to the specified path
      } catch(Exception e) {
         System.out.println(e.getMessage());
      }
   }
}

一旦应用程序编译并执行,指定 z 节点的数据将被更改,可以使用 ZooKeeper CLI, zkCli.sh 来检查它。

cd /path/to/zookeeper
bin/zkCli.sh
>>> get /MyFirstZnode

getChildren Method

ZooKeeper 类提供 getChildren 方法来获取某个特定 z 节点的全部子节点。 getChildren 方法的签名如下 -

getChildren(String path, Watcher watcher)

其中,

  1. path − Znode path.

  2. watcher - 类型为“观察者”的回调函数。当指定的 z 节点被删除或 z 节点下的子元素被创建/删除时,ZooKeeper 集成会通知。这是一次性通知。

Coding: ZKGetChildren.java

import java.io.IOException;
import java.util.*;

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;

public class ZKGetChildren {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to check existence of znode and its status, if znode is available.
   public static Stat znode_exists(String path) throws
      KeeperException,InterruptedException {
      return zk.exists(path,true);
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; // Assign path to the znode

      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         Stat stat = znode_exists(path); // Stat checks the path

         if(stat!= null) {

            //“getChildren” method- get all the children of znode.It has two
            args, path and watch
            List <String> children = zk.getChildren(path, false);
            for(int i = 0; i < children.size(); i++)
            System.out.println(children.get(i)); //Print children's
         } else {
            System.out.println("Node does not exists");
         }

      } catch(Exception e) {
         System.out.println(e.getMessage());
      }

   }

}

在运行程序之前,让我们使用 ZooKeeper CLI, zkCli.sh/MyFirstZnode 创建两个子节点。

cd /path/to/zookeeper
bin/zkCli.sh
>>> create /MyFirstZnode/myfirstsubnode Hi
>>> create /MyFirstZnode/mysecondsubmode Hi

现在,编译并运行程序将输出创建的上述 z 节点。

myfirstsubnode
mysecondsubnode

Delete a Znode

ZooKeeper 类提供 delete 方法来删除指定的 z 节点。 delete 方法的签名如下 -

delete(String path, int version)

其中,

  1. path − Znode path.

  2. version - z 节点的当前版本。

我们创建一个新的 Java 应用程序以了解 ZooKeeper API 的 delete 功能。创建文件 ZKDelete.java 。在 main 方法中,创建一个 ZooKeeper 对象 zk 使用 ZooKeeperConnection 对象。然后,使用 zk 对象的指定 path 和节点的版本调用 delete 方法。

删除znode的完整程序代码如下所示−

Coding: ZKDelete.java

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;

public class ZKDelete {
   private static ZooKeeper zk;
   private static ZooKeeperConnection conn;

   // Method to check existence of znode and its status, if znode is available.
   public static void delete(String path) throws KeeperException,InterruptedException {
      zk.delete(path,zk.exists(path,true).getVersion());
   }

   public static void main(String[] args) throws InterruptedException,KeeperException {
      String path = "/MyFirstZnode"; //Assign path to the znode

      try {
         conn = new ZooKeeperConnection();
         zk = conn.connect("localhost");
         delete(path); //delete the node with the specified path
      } catch(Exception e) {
         System.out.println(e.getMessage()); // catches error messages
      }
   }
}