001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.zksyncer; 019 020import java.io.IOException; 021import java.util.Iterator; 022import java.util.Map; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.Server; 028import org.apache.hadoop.hbase.util.Threads; 029import org.apache.hadoop.hbase.zookeeper.ZKListener; 030import org.apache.hadoop.hbase.zookeeper.ZKUtil; 031import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.zookeeper.CreateMode; 034import org.apache.zookeeper.KeeperException; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if 040 * changed 041 * <p/> 042 * The target znode(s) is given through {@link #getPathsToWatch()} method 043 */ 044@InterfaceAudience.Private 045public abstract class ClientZKSyncer extends ZKListener { 046 private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class); 047 private final Server server; 048 private final ZKWatcher clientZkWatcher; 049 050 /** 051 * Used to store the newest data which we want to sync to client zk. 052 * <p/> 053 * For meta location, since we may reduce the replica number, so here we add a {@code delete} flag 054 * to tell the updater delete the znode on client zk and quit. 055 */ 056 private static final class ZKData { 057 058 byte[] data; 059 060 boolean delete = false; 061 062 synchronized void set(byte[] data) { 063 this.data = data; 064 notifyAll(); 065 } 066 067 synchronized byte[] get() throws InterruptedException { 068 while (!delete && data == null) { 069 wait(); 070 } 071 byte[] d = data; 072 data = null; 073 return d; 074 } 075 076 synchronized void delete() { 077 this.delete = true; 078 notifyAll(); 079 } 080 081 synchronized boolean isDeleted() { 082 return delete; 083 } 084 } 085 086 // We use queues and daemon threads to synchronize the data to client ZK cluster 087 // to avoid blocking the single event thread for watchers 088 private final ConcurrentMap<String, ZKData> queues; 089 090 public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) { 091 super(watcher); 092 this.server = server; 093 this.clientZkWatcher = clientZkWatcher; 094 this.queues = new ConcurrentHashMap<>(); 095 } 096 097 private void startNewSyncThread(String path) { 098 ZKData zkData = new ZKData(); 099 queues.put(path, zkData); 100 Thread updater = new ClientZkUpdater(path, zkData); 101 updater.setDaemon(true); 102 updater.start(); 103 watchAndCheckExists(path); 104 } 105 106 /** 107 * Starts the syncer 108 * @throws KeeperException if error occurs when trying to create base nodes on client ZK 109 */ 110 public void start() throws KeeperException { 111 LOG.debug("Starting " + getClass().getSimpleName()); 112 this.watcher.registerListener(this); 113 // create base znode on remote ZK 114 ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode); 115 // set znodes for client ZK 116 Set<String> paths = getPathsToWatch(); 117 LOG.debug("ZNodes to watch: {}", paths); 118 // initialize queues and threads 119 for (String path : paths) { 120 startNewSyncThread(path); 121 } 122 } 123 124 private void watchAndCheckExists(String node) { 125 try { 126 if (ZKUtil.watchAndCheckExists(watcher, node)) { 127 byte[] data = ZKUtil.getDataAndWatch(watcher, node); 128 if (data != null) { 129 // put the data into queue 130 upsertQueue(node, data); 131 } else { 132 // It existed but now does not, should has been tracked by our watcher, ignore 133 LOG.debug("Found no data from " + node); 134 watchAndCheckExists(node); 135 } 136 } else { 137 // cleanup stale ZNodes on client ZK to avoid invalid requests to server 138 ZKUtil.deleteNodeFailSilent(clientZkWatcher, node); 139 } 140 } catch (KeeperException e) { 141 server.abort("Unexpected exception during initialization, aborting", e); 142 } 143 } 144 145 /** 146 * Update the value of the single element in queue if any, or else insert. 147 * <p/> 148 * We only need to synchronize the latest znode value to client ZK rather than synchronize each 149 * time 150 * @param data the data to write to queue 151 */ 152 private void upsertQueue(String node, byte[] data) { 153 ZKData zkData = queues.get(node); 154 if (zkData != null) { 155 zkData.set(data); 156 } 157 } 158 159 /** 160 * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when 161 * modifying this method 162 * @param node the znode to set on client ZK 163 * @param data the data to set to client ZK 164 * @throws InterruptedException if the thread is interrupted during process 165 */ 166 private void setDataForClientZkUntilSuccess(String node, byte[] data) 167 throws InterruptedException { 168 boolean create = false; 169 while (!server.isStopped()) { 170 try { 171 LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher); 172 if (create) { 173 ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT); 174 } else { 175 ZKUtil.setData(clientZkWatcher, node, data); 176 } 177 break; 178 } catch (KeeperException e) { 179 LOG.debug("Failed to set data for {} to client ZK, will retry later", node, e); 180 if (e.code() == KeeperException.Code.SESSIONEXPIRED) { 181 reconnectAfterExpiration(); 182 } 183 if (e.code() == KeeperException.Code.NONODE) { 184 create = true; 185 } 186 if (e.code() == KeeperException.Code.NODEEXISTS) { 187 create = false; 188 } 189 } 190 Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS); 191 } 192 } 193 194 private void deleteDataForClientZkUntilSuccess(String node) throws InterruptedException { 195 while (!server.isStopped()) { 196 LOG.debug("Delete remote " + node + ", client zk wather: " + clientZkWatcher); 197 try { 198 ZKUtil.deleteNode(clientZkWatcher, node); 199 break; 200 } catch (KeeperException e) { 201 if (e.code() == KeeperException.Code.NONODE) { 202 LOG.debug("Node is already deleted, give up", e); 203 break; 204 } 205 LOG.debug("Failed to delete node from client ZK, will retry later", e); 206 if (e.code() == KeeperException.Code.SESSIONEXPIRED) { 207 reconnectAfterExpiration(); 208 } 209 } 210 } 211 } 212 213 private final void reconnectAfterExpiration() throws InterruptedException { 214 LOG.warn("ZK session expired or lost. Retry a new connection..."); 215 try { 216 clientZkWatcher.reconnectAfterExpiration(); 217 } catch (IOException | KeeperException e) { 218 LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e); 219 } 220 } 221 222 private void getDataAndWatch(String path) { 223 try { 224 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 225 upsertQueue(path, data); 226 } catch (KeeperException e) { 227 LOG.warn("Unexpected exception handling nodeCreated event", e); 228 } 229 } 230 231 private void removeQueue(String path) { 232 ZKData zkData = queues.remove(path); 233 if (zkData != null) { 234 zkData.delete(); 235 } 236 } 237 238 @Override 239 public void nodeCreated(String path) { 240 if (validate(path)) { 241 getDataAndWatch(path); 242 } else { 243 removeQueue(path); 244 } 245 } 246 247 @Override 248 public void nodeDataChanged(String path) { 249 nodeCreated(path); 250 } 251 252 @Override 253 public synchronized void nodeDeleted(String path) { 254 if (validate(path)) { 255 try { 256 if (ZKUtil.watchAndCheckExists(watcher, path)) { 257 getDataAndWatch(path); 258 } 259 } catch (KeeperException e) { 260 LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e); 261 } 262 } else { 263 removeQueue(path); 264 } 265 } 266 267 /** 268 * Validate whether a znode path is watched by us 269 * @param path the path to validate 270 * @return true if the znode is watched by us 271 */ 272 protected abstract boolean validate(String path); 273 274 /** Returns the zk path(s) to watch */ 275 protected abstract Set<String> getPathsToWatch(); 276 277 protected final void refreshWatchingList() { 278 Set<String> newPaths = getPathsToWatch(); 279 LOG.debug("New ZNodes to watch: {}", newPaths); 280 Iterator<Map.Entry<String, ZKData>> iter = queues.entrySet().iterator(); 281 // stop unused syncers 282 while (iter.hasNext()) { 283 Map.Entry<String, ZKData> entry = iter.next(); 284 if (!newPaths.contains(entry.getKey())) { 285 iter.remove(); 286 entry.getValue().delete(); 287 } 288 } 289 // start new syncers 290 for (String newPath : newPaths) { 291 if (!queues.containsKey(newPath)) { 292 startNewSyncThread(newPath); 293 } 294 } 295 } 296 297 /** 298 * Thread to synchronize znode data to client ZK cluster 299 */ 300 private final class ClientZkUpdater extends Thread { 301 private final String znode; 302 private final ZKData zkData; 303 304 public ClientZkUpdater(String znode, ZKData zkData) { 305 this.znode = znode; 306 this.zkData = zkData; 307 setName("ClientZKUpdater-" + znode); 308 } 309 310 @Override 311 public void run() { 312 LOG.debug("Client zk updater for znode {} started", znode); 313 while (!server.isStopped()) { 314 try { 315 byte[] data = zkData.get(); 316 if (data != null) { 317 setDataForClientZkUntilSuccess(znode, data); 318 } else { 319 if (zkData.isDeleted()) { 320 deleteDataForClientZkUntilSuccess(znode); 321 break; 322 } 323 } 324 } catch (InterruptedException e) { 325 LOG.debug("Interrupted while checking whether need to update meta location to client zk"); 326 Thread.currentThread().interrupt(); 327 break; 328 } 329 } 330 LOG.debug("Client zk updater for znode {} stopped", znode); 331 } 332 } 333}