博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper实现队列_Queue
阅读量:6710 次
发布时间:2019-06-25

本文共 6113 字,大约阅读时间需要 20 分钟。

hot3.png

zookeeper实现队列_Queue

根据zookeeper的官方文档改写的demo,加了详细的注释。

Queue.java

package com.usfot;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.nio.ByteBuffer;import java.util.List;/** * Created by liyanxin on 2015/3/17. */public class Queue implements Watcher {    private static final String addr = "127.0.0.1:2181";    private String root;    private ZooKeeper zk = null;    private Integer mutex;    /**     * Constructor of producer-consumer queue     *     * @param root     */    public Queue(String root) {        this.root = root;        try {            //连接zk服务器            zk = new ZooKeeper(addr, 10 * 10000, this);        } catch (IOException e) {            e.printStackTrace();        }        mutex = new Integer(-1);        // Create ZK node name        if (zk != null) {            try {                //建立根目录节点                Stat s = zk.exists(root, false);                if (s == null) {                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,                            CreateMode.PERSISTENT);                }            } catch (KeeperException e) {                System.out                        .println("Keeper exception when instantiating queue: "                                + e.toString());            } catch (InterruptedException e) {                System.out.println("Interrupted exception");            }        }    }    /**     * 当znode上事件触发,唤醒相应的等待线程     *     * @param event     */    public void process(WatchedEvent event) {        synchronized (mutex) {            //System.out.println("Process: " + event.getType());            mutex.notify();        }    }    /**     * Add element to the queue.     *     * @param i     * @return     */    boolean produce(int i) throws KeeperException, InterruptedException {        ByteBuffer b = ByteBuffer.allocate(4);        byte[] value;        // Add child with value i        b.putInt(i);        value = b.array();        zk.create(root + "/element" + i, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,                CreateMode.PERSISTENT);        System.out.println("=========Produce znode=========");        return true;    }    /**     * Remove first element from the queue.     *     * @return     * @throws KeeperException     * @throws InterruptedException     */    int consume() throws KeeperException, InterruptedException {        int retvalue = -1;        Stat stat = null;        // Get the first element available        while (true) {            synchronized (mutex) {                List
 list = zk.getChildren(root, true);                if (list.size() == 0) {                    System.out.println("Going to wait");                    mutex.wait();                } else {                    Integer min = new Integer(list.get(0).substring(7));                    for (String s : list) {                        Integer tempValue = new Integer(s.substring(7));                        //System.out.println("Temporary value: " + tempValue);                        if (tempValue < min) min = tempValue;                    }                    System.out.println("Temporary value: " + root + "/element" + min);                    byte[] b = zk.getData(root + "/element" + min,                            false, stat);                    zk.delete(root + "/element" + min, 0);                    ByteBuffer buffer = ByteBuffer.wrap(b);                    retvalue = buffer.getInt();                    return retvalue;                }            }        }    }}

QueueTest.java

package com.usfot;import org.apache.zookeeper.KeeperException;/** * 测试基于zookeeper的生产者和消费者队列 * 测试过程: * 一个线程充当生产者,一个线程充当消费者 * Created by liyanxin on 2015/3/17. */public class QueueTest {    public static void main(String args[]) {        Producer producer = new Producer(new Queue("/app1"));        //生产者线程启动        producer.start();        // 启动多个线程作为消费者,但在当前的分布式环境中,消费者之间会竞争某些资源,也是说任务不是        // 同步进行的,导致一个消费者想要得到某个节点的数据时,而这个节点却被另一个消费者删除了,        // 导致KeeperException$NoNodeException。线程是不安全的。//        for (int i = 0; i < 100; i++) {//            Consumer consumer = new Consumer(new Queue("/app1"));//            consumer.start();//        }        // 消费者线程        Consumer consumer = new Consumer(new Queue("/app1"));        consumer.start();    }}/** * 生产者线程 */class Producer extends Thread {    private Queue queue;    public Producer(Queue queue) {        this.queue = queue;    }    @Override    public void run() {        // 该生产者线程创建1000个znode        for (int i = 0; i < 100; i++) {            try {                queue.produce(i);                Thread.sleep(1000);            } catch (KeeperException e) {                e.printStackTrace();            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}/** * 消费者线程 */class Consumer extends Thread {    private Queue queue;    public Consumer(Queue queue) {        this.queue = queue;    }    // 消费者线程    // 不断的从zk服务器中读取znode,进行操作。    @Override    public void run() {        try {            while (true) {                int retvalue = queue.consume();                System.out.println("thread_name=" +                        Thread.currentThread().getName() + ":" + retvalue);                Thread.sleep(1000);            }        } catch (KeeperException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

运行结果也是我想象中的,如下,

...............................=========Produce znode=========Temporary value: /app1/element97thread_name=Thread-1:97=========Produce znode=========Temporary value: /app1/element98thread_name=Thread-1:98=========Produce znode=========Temporary value: /app1/element99thread_name=Thread-1:99Going to wait最后消费者线程Going to wait,只有当有新的znode创建后或事件发生后触发才会唤醒消费者线程。

=================END=================

转载于:https://my.oschina.net/xinxingegeya/blog/388301

你可能感兴趣的文章
分享B2B信息发布小技巧
查看>>
你不得不知道的Visual Studio 2012(3)- 创建Windows应用程序
查看>>
Linux环境下C语言模拟内存负载测试
查看>>
Cocos Creator中的动画支持技术
查看>>
“2012年度IT博客大赛”获奖感言--梦想、学习、坚持、自信、淡定
查看>>
年轻群体当道,哈弗F7如何赢得芳心?
查看>>
关于考核与面谈
查看>>
项目案例分享四:DC升级后Sysvol停止复制,日志报13508
查看>>
职场思想分享001 | 有多种选择才叫有能力
查看>>
ASP.NET MVC 5 - 验证编辑方法(Edit method)和编辑视图(Edit view)
查看>>
3D圣诞树源码[强力推荐]
查看>>
25 个超棒的 WordPress 主题(2012)
查看>>
Concurrent use of embedded Ruby in Java (using JRuby)
查看>>
基础才是重中之重~.net中的显式事务与隐式事务
查看>>
转载 - 通过设置P3P头来实现跨域访问COOKIE
查看>>
使用泛型创建只读集合
查看>>
SQL Server 中如何判断表是否存在
查看>>
delphi日期格式显示及文件打开方式小结
查看>>
最近看的一些东东
查看>>
POJ 3281 Dining(最大流)
查看>>