主页 > 游戏开发  > 

zookeeperWatcher

zookeeperWatcher

目录 Watcher的一些常识Watcher是什么Watcher怎么用Watcher特性 回顾回调&观察者模式&发布订阅模式Zookeeper 客户端/ 服务端 watcher机制流程说明getChildren 并注册watcher的例子流程说明

Watcher的一些常识 Watcher是什么

ZK中引入Watcher机制来实现分布式的通知功能。

ZK允许客户端向服务端注册一个Watcher监听,当服务点的的指定事件触发监听时,那么服务端就会向客户端发送事件通知,以便客户端完成逻辑操作(即客户端向服务端注册监听,并将watcher对象存在客户端的Watchermanager中 服务端触发事件后,向客户端发送通知,客户端收到通知后从wacherManager中取出对象来执行回调逻辑)

Watcher怎么用

参考: blog.csdn.net/qq_26437925/article/details/145715160 中的一个例子代码

判断特定路径,添加了Watcher 对其中的结点删除事件做出对应处理。

zk.exists(preNodePath, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { // 前一个节点删除了释放锁了,就唤醒本结点 synchronized (node) { node.notify(); } } } });

可以查看Watcher接口如下:

public class WatchedEvent { private final KeeperState keeperState; //用于记录Event发生时的zk状态(通知状态 private final EventType eventType; // 记录Event的类型 private String path; Watcher特性 一次性:一旦一个watcher被触发,ZK都会将其从相应的的存储中移除,所以watcher是需要每注册一次,才可触发一次。客户端串行执行:客户端watcher回调过程是一个串行同步的过程轻量:watcher数据结构中只包含:通知状态、事件类型和节点路径

之所以这么说,其实可以在源码实现中找到:

org.apache.zookeeper.server.WatchManager#triggerWatch,此为真正出发watcher执行的逻辑, 后文将再次说明。

可以看到watcher执行一次就删除了,for循环逐一执行的。

Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; // 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的 synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } // 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#process w.process(e); } return watchers; } 回顾回调&观察者模式&发布订阅模式 回调的思想 类A的a()方法调用类B的b()方法类B的b()方法执行完毕主动调用类A的callback()方法 观察者模式 发布订阅,对比 观察者模式 Zookeeper 客户端/ 服务端 watcher机制 流程说明 一般客户端调用exists/getData/getChildren注册监听,其中请求会封装为zookeeper内部的协议packet,添加到outgoingQueue队列中然后客户端内部的SendThread这个线程会执行数据发送操作,主要是将outgoingQueue队列中的数据发送到服务端客户端发给服务端了,等待服务端响应的packet结合放到了SendThread内部的pendingQueue队列中客户端会将watcher存储到WatchManager的watchTable和watch2Paths中,缓存一样; 服务端仅仅只是保存了当前连接的 ServerCnxn 对象(ServerCnxn是服务端与客户端进行网络交互的一个NIO接口,代表了客户端与服务端的连接) /** * Interface to a Server connection - represents a connection from a client * to the server. */ public abstract class ServerCnxn implements Stats, Watcher {
这样当指定的节点发生相关的事件时,即要处理此请求了,会判断是否有watcher,注册到WatchManager, 最后会调用WatchManager的triggerWatch方法触发相关的事件。triggerWatch就是客户端从之前WatchManager缓存取出wacther执行的过程

举例org.apache.zookeeper.server.FinalRequestProcessor#handleGetDataRequest

private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException { GetDataRequest getDataRequest = (GetDataRequest) request; String path = getDataRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NoNodeException(); } zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null); Stat stat = new Stat(); byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null); return new GetDataResponse(b, stat); }

客户端有一个专门处理watcher时间的线程EventThread,其保持了一个待处理事件的队列。它根据传递的事件的类型和节点信息,从客户端的ZKWatcherManager中取出相关的watcher,将其添加到EventThread事件队列中,并在去run方法中不断取出watcher事件进行处理。

通过将节点信息和事件类型进行封装成为watchedevent,并查找到到对应节点的注册的watcher,然后调用watcher的回调方法process进行处理。

getChildren 并注册watcher的例子流程说明 /** * Return the list of the children of the node of the given path. * <p> * If the watch is non-null and the call is successful (no exception is thrown), * a watch will be left on the node with the given path. The watch willbe * triggered by a successful operation that deletes the node of the given * path or creates/delete a child under the node. * <p> * The list of children returned is not sorted and no guarantee is provided * as to its natural or lexical order. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * * @param path * @param watcher explicit watcher * @return an unordered array of children of the node with the given path * @throws InterruptedException If the server transaction is interrupted. * @throws KeeperException If the server signals an error with a non-zero error code. * @throws IllegalArgumentException if an invalid path is specified */ public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new ChildWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getChildren); GetChildrenRequest request = new GetChildrenRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetChildrenResponse response = new GetChildrenResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } return response.getChildren(); }

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 发送请求给服务端

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // 客户端与服务端的网络传输 ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration); synchronized(packet) { while(!packet.finished) { packet.wait(); } return r; } } ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { ClientCnxn.Packet packet = null; LinkedList var11 = this.outgoingQueue; synchronized(this.outgoingQueue) { // 传输的对象都包装成Packet对象 packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (this.state.isAlive() && !this.closing) { if (h.getType() == -11) { this.closing = true; } // 放入发送队列中,等待发送 this.outgoingQueue.add(packet); } else { this.conLossPacket(packet); } } this.sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }

outgoingQueue的处理 服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest处理

case OpCode.getChildren: { lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest); DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath()); if (n == null) { throw new KeeperException.NoNodeException(); } PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo); // 返回children, // 这里根据客户端设置的是否有watch变量来传入watcher对象 // 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接) List<String> children = zks.getZKDatabase().getChildren( getChildrenRequest.getPath(), null, getChildrenRequest .getWatch() ? cnxn : null); rsp = new GetChildrenResponse(children); break; }

将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTable和watch2Paths中

public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { if (stat != null) { n.copyStat(stat); } List<String> children=new ArrayList<String>(n.getChildren()); if (watcher != null) { childWatches.addWatch(path, watcher); } return children; } } 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中 /** * This class services the outgoing request queue and generates the heart * beats. It also spawns the ReadThread. */ class SendThread extends ZooKeeperThread { private long lastPingSentNs; private final ClientCnxnSocket clientCnxnSocket; private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true; void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath pareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); return; } // If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. if (tunnelAuthInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia,"token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { finishPacket(packet); } } private void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { p.watchRegistration.register(err); } // Add all the removed watch events to the event queue, so that the // clients will be notified with 'Data/Child WatchRemoved' event type. if (p.watchDeregistration != null) { Map<EventType, Set<Watcher>> materializedWatchers = null; try { materializedWatchers = p.watchDeregistration.unregister(err); for (Entry<EventType, Set<Watcher>> entry : materializedWatchers .entrySet()) { Set<Watcher> watchers = entry.getValue(); if (watchers.size() > 0) { queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey()); // ignore connectionloss when removing from local // session p.replyHeader.setErr(Code.OK.intValue()); } } } catch (KeeperException.NoWatcherException nwe) { LOG.error("Failed to find watcher!", nwe); p.replyHeader.setErr(nwe.code().intValue()); } catch (KeeperException ke) { LOG.error("Exception when removing watcher", ke); p.replyHeader.setErr(ke.code().intValue()); } } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }

触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch

Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; // 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的 synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } // 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#process w.process(e); } return watchers; }
标签:

zookeeperWatcher由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“zookeeperWatcher