zookeeper实现队列_Queue
根据zookeeper的官方文档改写的demo,加了详细的注释。
Queue.java
package com.usfot;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.nio.ByteBuffer;import java.util.List;/** * Created by liyanxin on 2015/3/17. */public class Queue implements Watcher { private static final String addr = "127.0.0.1:2181"; private String root; private ZooKeeper zk = null; private Integer mutex; /** * Constructor of producer-consumer queue * * @param root */ public Queue(String root) { this.root = root; try { //连接zk服务器 zk = new ZooKeeper(addr, 10 * 10000, this); } catch (IOException e) { e.printStackTrace(); } mutex = new Integer(-1); // Create ZK node name if (zk != null) { try { //建立根目录节点 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } /** * 当znode上事件触发,唤醒相应的等待线程 * * @param event */ public void process(WatchedEvent event) { synchronized (mutex) { //System.out.println("Process: " + event.getType()); mutex.notify(); } } /** * Add element to the queue. * * @param i * @return */ boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(i); value = b.array(); zk.create(root + "/element" + i, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("=========Produce znode========="); return true; } /** * Remove first element from the queue. * * @return * @throws KeeperException * @throws InterruptedException */ int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; // Get the first element available while (true) { synchronized (mutex) { Listlist = zk.getChildren(root, true); if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tempValue = new Integer(s.substring(7)); //System.out.println("Temporary value: " + tempValue); if (tempValue < min) min = tempValue; } System.out.println("Temporary value: " + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } }}
QueueTest.java
package com.usfot;import org.apache.zookeeper.KeeperException;/** * 测试基于zookeeper的生产者和消费者队列 * 测试过程: * 一个线程充当生产者,一个线程充当消费者 * Created by liyanxin on 2015/3/17. */public class QueueTest { public static void main(String args[]) { Producer producer = new Producer(new Queue("/app1")); //生产者线程启动 producer.start(); // 启动多个线程作为消费者,但在当前的分布式环境中,消费者之间会竞争某些资源,也是说任务不是 // 同步进行的,导致一个消费者想要得到某个节点的数据时,而这个节点却被另一个消费者删除了, // 导致KeeperException$NoNodeException。线程是不安全的。// for (int i = 0; i < 100; i++) {// Consumer consumer = new Consumer(new Queue("/app1"));// consumer.start();// } // 消费者线程 Consumer consumer = new Consumer(new Queue("/app1")); consumer.start(); }}/** * 生产者线程 */class Producer extends Thread { private Queue queue; public Producer(Queue queue) { this.queue = queue; } @Override public void run() { // 该生产者线程创建1000个znode for (int i = 0; i < 100; i++) { try { queue.produce(i); Thread.sleep(1000); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }}/** * 消费者线程 */class Consumer extends Thread { private Queue queue; public Consumer(Queue queue) { this.queue = queue; } // 消费者线程 // 不断的从zk服务器中读取znode,进行操作。 @Override public void run() { try { while (true) { int retvalue = queue.consume(); System.out.println("thread_name=" + Thread.currentThread().getName() + ":" + retvalue); Thread.sleep(1000); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }}
运行结果也是我想象中的,如下,
...............................=========Produce znode=========Temporary value: /app1/element97thread_name=Thread-1:97=========Produce znode=========Temporary value: /app1/element98thread_name=Thread-1:98=========Produce znode=========Temporary value: /app1/element99thread_name=Thread-1:99Going to wait最后消费者线程Going to wait,只有当有新的znode创建后或事件发生后触发才会唤醒消费者线程。
=================END=================