001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.zksyncer; 020 021import java.io.IOException; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.concurrent.ArrayBlockingQueue; 026import java.util.concurrent.BlockingQueue; 027 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.Server; 030import org.apache.hadoop.hbase.util.Threads; 031import org.apache.hadoop.hbase.zookeeper.ZKListener; 032import org.apache.hadoop.hbase.zookeeper.ZKUtil; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.zookeeper.CreateMode; 036import org.apache.zookeeper.KeeperException; 037 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if 043 * changed 044 * <p/> 045 * The target znode(s) is given through {@link #getNodesToWatch()} method 046 */ 047@InterfaceAudience.Private 048public abstract class ClientZKSyncer extends ZKListener { 049 private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class); 050 private final Server server; 051 private final ZKWatcher clientZkWatcher; 052 // We use queues and daemon threads to synchronize the data to client ZK cluster 053 // to avoid blocking the single event thread for watchers 054 private final Map<String, BlockingQueue<byte[]>> queues; 055 056 public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) { 057 super(watcher); 058 this.server = server; 059 this.clientZkWatcher = clientZkWatcher; 060 this.queues = new HashMap<>(); 061 } 062 063 /** 064 * Starts the syncer 065 * @throws KeeperException if error occurs when trying to create base nodes on client ZK 066 */ 067 public void start() throws KeeperException { 068 LOG.debug("Starting " + getClass().getSimpleName()); 069 this.watcher.registerListener(this); 070 // create base znode on remote ZK 071 ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode); 072 // set meta znodes for client ZK 073 Collection<String> nodes = getNodesToWatch(); 074 LOG.debug("Znodes to watch: " + nodes); 075 // initialize queues and threads 076 for (String node : nodes) { 077 BlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1); 078 queues.put(node, queue); 079 Thread updater = new ClientZkUpdater(node, queue); 080 updater.setDaemon(true); 081 updater.start(); 082 watchAndCheckExists(node); 083 } 084 } 085 086 private void watchAndCheckExists(String node) { 087 try { 088 if (ZKUtil.watchAndCheckExists(watcher, node)) { 089 byte[] data = ZKUtil.getDataAndWatch(watcher, node); 090 if (data != null) { 091 // put the data into queue 092 upsertQueue(node, data); 093 } else { 094 // It existed but now does not, should has been tracked by our watcher, ignore 095 LOG.debug("Found no data from " + node); 096 watchAndCheckExists(node); 097 } 098 } else { 099 // cleanup stale ZNodes on client ZK to avoid invalid requests to server 100 ZKUtil.deleteNodeFailSilent(clientZkWatcher, node); 101 } 102 } catch (KeeperException e) { 103 server.abort("Unexpected exception during initialization, aborting", e); 104 } 105 } 106 107 /** 108 * Update the value of the single element in queue if any, or else insert. 109 * <p/> 110 * We only need to synchronize the latest znode value to client ZK rather than synchronize each 111 * time 112 * @param data the data to write to queue 113 */ 114 private void upsertQueue(String node, byte[] data) { 115 BlockingQueue<byte[]> queue = queues.get(node); 116 synchronized (queue) { 117 queue.poll(); 118 queue.offer(data); 119 } 120 } 121 122 /** 123 * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when 124 * modifying this method 125 * @param node the znode to set on client ZK 126 * @param data the data to set to client ZK 127 * @throws InterruptedException if the thread is interrupted during process 128 */ 129 private final void setDataForClientZkUntilSuccess(String node, byte[] data) 130 throws InterruptedException { 131 while (!server.isStopped()) { 132 try { 133 LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher); 134 ZKUtil.setData(clientZkWatcher, node, data); 135 break; 136 } catch (KeeperException.NoNodeException nne) { 137 // Node doesn't exist, create it and set value 138 try { 139 ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT); 140 break; 141 } catch (KeeperException.ConnectionLossException 142 | KeeperException.SessionExpiredException ee) { 143 reconnectAfterExpiration(); 144 } catch (KeeperException e) { 145 LOG.warn( 146 "Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later"); 147 } 148 } catch (KeeperException.ConnectionLossException 149 | KeeperException.SessionExpiredException ee) { 150 reconnectAfterExpiration(); 151 } catch (KeeperException e) { 152 LOG.debug("Failed to set data to client ZK, will retry later", e); 153 } 154 Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS); 155 } 156 } 157 158 private final void reconnectAfterExpiration() throws InterruptedException { 159 LOG.warn("ZK session expired or lost. Retry a new connection..."); 160 try { 161 clientZkWatcher.reconnectAfterExpiration(); 162 } catch (IOException | KeeperException e) { 163 LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e); 164 } 165 } 166 167 @Override 168 public void nodeCreated(String path) { 169 if (!validate(path)) { 170 return; 171 } 172 try { 173 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 174 upsertQueue(path, data); 175 } catch (KeeperException e) { 176 LOG.warn("Unexpected exception handling nodeCreated event", e); 177 } 178 } 179 180 @Override 181 public void nodeDataChanged(String path) { 182 if (validate(path)) { 183 nodeCreated(path); 184 } 185 } 186 187 @Override 188 public synchronized void nodeDeleted(String path) { 189 if (validate(path)) { 190 try { 191 if (ZKUtil.watchAndCheckExists(watcher, path)) { 192 nodeCreated(path); 193 } 194 } catch (KeeperException e) { 195 LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e); 196 } 197 } 198 } 199 200 /** 201 * Validate whether a znode path is watched by us 202 * @param path the path to validate 203 * @return true if the znode is watched by us 204 */ 205 abstract boolean validate(String path); 206 207 /** 208 * @return the znode(s) to watch 209 */ 210 abstract Collection<String> getNodesToWatch(); 211 212 /** 213 * Thread to synchronize znode data to client ZK cluster 214 */ 215 class ClientZkUpdater extends Thread { 216 final String znode; 217 final BlockingQueue<byte[]> queue; 218 219 public ClientZkUpdater(String znode, BlockingQueue<byte[]> queue) { 220 this.znode = znode; 221 this.queue = queue; 222 setName("ClientZKUpdater-" + znode); 223 } 224 225 @Override 226 public void run() { 227 while (!server.isStopped()) { 228 try { 229 byte[] data = queue.take(); 230 setDataForClientZkUntilSuccess(znode, data); 231 } catch (InterruptedException e) { 232 if (LOG.isDebugEnabled()) { 233 LOG.debug( 234 "Interrupted while checking whether need to update meta location to client zk"); 235 } 236 Thread.currentThread().interrupt(); 237 break; 238 } 239 } 240 } 241 } 242}