001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.master.zksyncer; 020 021import java.io.IOException; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.util.Threads; 030import org.apache.hadoop.hbase.zookeeper.ZKListener; 031import org.apache.hadoop.hbase.zookeeper.ZKUtil; 032import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.apache.zookeeper.CreateMode; 035import org.apache.zookeeper.KeeperException; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if 041 * changed 042 * <p/> 043 * The target znode(s) is given through {@link #getPathsToWatch()} method 044 */ 045@InterfaceAudience.Private 046public abstract class ClientZKSyncer extends ZKListener { 047 private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class); 048 private final Server server; 049 private final ZKWatcher clientZkWatcher; 050 051 /** 052 * Used to store the newest data which we want to sync to client zk. 053 * <p/> 054 * For meta location, since we may reduce the replica number, so here we add a {@code delete} flag 055 * to tell the updater delete the znode on client zk and quit. 056 */ 057 private static final class ZKData { 058 059 byte[] data; 060 061 boolean delete = false; 062 063 synchronized void set(byte[] data) { 064 this.data = data; 065 notifyAll(); 066 } 067 068 synchronized byte[] get() throws InterruptedException { 069 while (!delete && data == null) { 070 wait(); 071 } 072 byte[] d = data; 073 data = null; 074 return d; 075 } 076 077 synchronized void delete() { 078 this.delete = true; 079 notifyAll(); 080 } 081 082 synchronized boolean isDeleted() { 083 return delete; 084 } 085 } 086 087 // We use queues and daemon threads to synchronize the data to client ZK cluster 088 // to avoid blocking the single event thread for watchers 089 private final ConcurrentMap<String, ZKData> queues; 090 091 public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) { 092 super(watcher); 093 this.server = server; 094 this.clientZkWatcher = clientZkWatcher; 095 this.queues = new ConcurrentHashMap<>(); 096 } 097 098 private void startNewSyncThread(String path) { 099 ZKData zkData = new ZKData(); 100 queues.put(path, zkData); 101 Thread updater = new ClientZkUpdater(path, zkData); 102 updater.setDaemon(true); 103 updater.start(); 104 watchAndCheckExists(path); 105 } 106 107 /** 108 * Starts the syncer 109 * @throws KeeperException if error occurs when trying to create base nodes on client ZK 110 */ 111 public void start() throws KeeperException { 112 LOG.debug("Starting " + getClass().getSimpleName()); 113 this.watcher.registerListener(this); 114 // create base znode on remote ZK 115 ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode); 116 // set znodes for client ZK 117 Set<String> paths = getPathsToWatch(); 118 LOG.debug("ZNodes to watch: {}", paths); 119 // initialize queues and threads 120 for (String path : paths) { 121 startNewSyncThread(path); 122 } 123 } 124 125 private void watchAndCheckExists(String node) { 126 try { 127 if (ZKUtil.watchAndCheckExists(watcher, node)) { 128 byte[] data = ZKUtil.getDataAndWatch(watcher, node); 129 if (data != null) { 130 // put the data into queue 131 upsertQueue(node, data); 132 } else { 133 // It existed but now does not, should has been tracked by our watcher, ignore 134 LOG.debug("Found no data from " + node); 135 watchAndCheckExists(node); 136 } 137 } else { 138 // cleanup stale ZNodes on client ZK to avoid invalid requests to server 139 ZKUtil.deleteNodeFailSilent(clientZkWatcher, node); 140 } 141 } catch (KeeperException e) { 142 server.abort("Unexpected exception during initialization, aborting", e); 143 } 144 } 145 146 /** 147 * Update the value of the single element in queue if any, or else insert. 148 * <p/> 149 * We only need to synchronize the latest znode value to client ZK rather than synchronize each 150 * time 151 * @param data the data to write to queue 152 */ 153 private void upsertQueue(String node, byte[] data) { 154 ZKData zkData = queues.get(node); 155 if (zkData != null) { 156 zkData.set(data); 157 } 158 } 159 160 /** 161 * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when 162 * modifying this method 163 * @param node the znode to set on client ZK 164 * @param data the data to set to client ZK 165 * @throws InterruptedException if the thread is interrupted during process 166 */ 167 private void setDataForClientZkUntilSuccess(String node, byte[] data) 168 throws InterruptedException { 169 boolean create = false; 170 while (!server.isStopped()) { 171 try { 172 LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher); 173 if (create) { 174 ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT); 175 } else { 176 ZKUtil.setData(clientZkWatcher, node, data); 177 } 178 break; 179 } catch (KeeperException e) { 180 LOG.debug("Failed to set data for {} to client ZK, will retry later", node, e); 181 if (e.code() == KeeperException.Code.SESSIONEXPIRED) { 182 reconnectAfterExpiration(); 183 } 184 if (e.code() == KeeperException.Code.NONODE) { 185 create = true; 186 } 187 if (e.code() == KeeperException.Code.NODEEXISTS) { 188 create = false; 189 } 190 } 191 Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS); 192 } 193 } 194 195 private void deleteDataForClientZkUntilSuccess(String node) throws InterruptedException { 196 while (!server.isStopped()) { 197 LOG.debug("Delete remote " + node + ", client zk wather: " + clientZkWatcher); 198 try { 199 ZKUtil.deleteNode(clientZkWatcher, node); 200 } catch (KeeperException e) { 201 LOG.debug("Failed to delete node from client ZK, will retry later", e); 202 if (e.code() == KeeperException.Code.SESSIONEXPIRED) { 203 reconnectAfterExpiration(); 204 } 205 206 } 207 } 208 } 209 210 private final void reconnectAfterExpiration() throws InterruptedException { 211 LOG.warn("ZK session expired or lost. Retry a new connection..."); 212 try { 213 clientZkWatcher.reconnectAfterExpiration(); 214 } catch (IOException | KeeperException e) { 215 LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e); 216 } 217 } 218 219 private void getDataAndWatch(String path) { 220 try { 221 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 222 upsertQueue(path, data); 223 } catch (KeeperException e) { 224 LOG.warn("Unexpected exception handling nodeCreated event", e); 225 } 226 } 227 228 private void removeQueue(String path) { 229 ZKData zkData = queues.remove(path); 230 if (zkData != null) { 231 zkData.delete(); 232 } 233 } 234 235 @Override 236 public void nodeCreated(String path) { 237 if (validate(path)) { 238 getDataAndWatch(path); 239 } else { 240 removeQueue(path); 241 } 242 } 243 244 @Override 245 public void nodeDataChanged(String path) { 246 nodeCreated(path); 247 } 248 249 @Override 250 public synchronized void nodeDeleted(String path) { 251 if (validate(path)) { 252 try { 253 if (ZKUtil.watchAndCheckExists(watcher, path)) { 254 getDataAndWatch(path); 255 } 256 } catch (KeeperException e) { 257 LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e); 258 } 259 } else { 260 removeQueue(path); 261 } 262 } 263 264 /** 265 * Validate whether a znode path is watched by us 266 * @param path the path to validate 267 * @return true if the znode is watched by us 268 */ 269 protected abstract boolean validate(String path); 270 271 /** 272 * @return the zk path(s) to watch 273 */ 274 protected abstract Set<String> getPathsToWatch(); 275 276 protected final void refreshWatchingList() { 277 Set<String> newPaths = getPathsToWatch(); 278 LOG.debug("New ZNodes to watch: {}", newPaths); 279 Iterator<Map.Entry<String, ZKData>> iter = queues.entrySet().iterator(); 280 // stop unused syncers 281 while (iter.hasNext()) { 282 Map.Entry<String, ZKData> entry = iter.next(); 283 if (!newPaths.contains(entry.getKey())) { 284 iter.remove(); 285 entry.getValue().delete(); 286 } 287 } 288 // start new syncers 289 for (String newPath : newPaths) { 290 if (!queues.containsKey(newPath)) { 291 startNewSyncThread(newPath); 292 } 293 } 294 } 295 296 /** 297 * Thread to synchronize znode data to client ZK cluster 298 */ 299 private final class ClientZkUpdater extends Thread { 300 private final String znode; 301 private final ZKData zkData; 302 303 public ClientZkUpdater(String znode, ZKData zkData) { 304 this.znode = znode; 305 this.zkData = zkData; 306 setName("ClientZKUpdater-" + znode); 307 } 308 309 @Override 310 public void run() { 311 LOG.debug("Client zk updater for znode {} started", znode); 312 while (!server.isStopped()) { 313 try { 314 byte[] data = zkData.get(); 315 if (data != null) { 316 setDataForClientZkUntilSuccess(znode, data); 317 } else { 318 if (zkData.isDeleted()) { 319 deleteDataForClientZkUntilSuccess(znode); 320 break; 321 } 322 } 323 } catch (InterruptedException e) { 324 LOG.debug("Interrupted while checking whether need to update meta location to client zk"); 325 Thread.currentThread().interrupt(); 326 break; 327 } 328 } 329 LOG.debug("Client zk updater for znode {} stopped", znode); 330 } 331 } 332}