001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Optional; 023import java.util.concurrent.ConcurrentNavigableMap; 024import java.util.concurrent.ThreadFactory; 025import org.apache.hadoop.hbase.HRegionLocation; 026import org.apache.hadoop.hbase.exceptions.DeserializationException; 027import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; 028import org.apache.hadoop.hbase.util.RetryCounter; 029import org.apache.hadoop.hbase.util.RetryCounterFactory; 030import org.apache.hadoop.hbase.zookeeper.ZKListener; 031import org.apache.hadoop.hbase.zookeeper.ZKUtil; 032import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 033import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.zookeeper.KeeperException; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 040 041/** 042 * A cache of meta region location metadata. Registers a listener on ZK to track changes to the 043 * meta table znodes. Clients are expected to retry if the meta information is stale. This class 044 * is thread-safe (a single instance of this class can be shared by multiple threads without race 045 * conditions). 046 */ 047@InterfaceAudience.Private 048public class MetaRegionLocationCache extends ZKListener { 049 050 private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class); 051 052 /** 053 * Maximum number of times we retry when ZK operation times out. 054 */ 055 private static final int MAX_ZK_META_FETCH_RETRIES = 10; 056 /** 057 * Sleep interval ms between ZK operation retries. 058 */ 059 private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000; 060 private static final int SLEEP_INTERVAL_MS_MAX = 10000; 061 private final RetryCounterFactory retryCounterFactory = 062 new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES); 063 064 /** 065 * Cached meta region locations indexed by replica ID. 066 * CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during 067 * client requests. Even though CopyOnWriteArrayMap copies the data structure for every write, 068 * that should be OK since the size of the list is often small and mutations are not too often 069 * and we do not need to block client requests while mutations are in progress. 070 */ 071 private final CopyOnWriteArrayMap<Integer, HRegionLocation> cachedMetaLocations; 072 073 private enum ZNodeOpType { 074 INIT, 075 CREATED, 076 CHANGED, 077 DELETED 078 } 079 080 public MetaRegionLocationCache(ZKWatcher zkWatcher) { 081 super(zkWatcher); 082 cachedMetaLocations = new CopyOnWriteArrayMap<>(); 083 watcher.registerListener(this); 084 // Populate the initial snapshot of data from meta znodes. 085 // This is needed because stand-by masters can potentially start after the initial znode 086 // creation. It blocks forever until the initial meta locations are loaded from ZK and watchers 087 // are established. Subsequent updates are handled by the registered listener. Also, this runs 088 // in a separate thread in the background to not block master init. 089 ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build(); 090 RetryCounterFactory retryFactory = new RetryCounterFactory( 091 Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX); 092 threadFactory.newThread( 093 ()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start(); 094 } 095 096 /** 097 * Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers 098 * a watcher on base znode to check for any CREATE/DELETE events on the children. 099 * @param retryCounter controls the number of retries and sleep between retries. 100 */ 101 private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) { 102 List<String> znodes = null; 103 while (retryCounter.shouldRetry()) { 104 try { 105 znodes = watcher.getMetaReplicaNodesAndWatchChildren(); 106 break; 107 } catch (KeeperException ke) { 108 LOG.debug("Error populating initial meta locations", ke); 109 if (!retryCounter.shouldRetry()) { 110 // Retries exhausted and watchers not set. This is not a desirable state since the cache 111 // could remain stale forever. Propagate the exception. 112 watcher.abort("Error populating meta locations", ke); 113 return; 114 } 115 try { 116 retryCounter.sleepUntilNextRetry(); 117 } catch (InterruptedException ie) { 118 LOG.error("Interrupted while loading meta locations from ZK", ie); 119 Thread.currentThread().interrupt(); 120 return; 121 } 122 } 123 } 124 if (znodes == null || znodes.isEmpty()) { 125 // No meta znodes exist at this point but we registered a watcher on the base znode to listen 126 // for updates. They will be handled via nodeChildrenChanged(). 127 return; 128 } 129 if (znodes.size() == cachedMetaLocations.size()) { 130 // No new meta znodes got added. 131 return; 132 } 133 for (String znode: znodes) { 134 String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode); 135 updateMetaLocation(path, opType); 136 } 137 } 138 139 /** 140 * Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for 141 * future updates. 142 * @param replicaId ReplicaID of the region. 143 * @return HRegionLocation for the meta replica. 144 * @throws KeeperException if there is any issue fetching/parsing the serialized data. 145 */ 146 private HRegionLocation getMetaRegionLocation(int replicaId) 147 throws KeeperException { 148 RegionState metaRegionState; 149 try { 150 byte[] data = ZKUtil.getDataAndWatch(watcher, 151 watcher.getZNodePaths().getZNodeForReplica(replicaId)); 152 metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId); 153 } catch (DeserializationException e) { 154 throw ZKUtil.convert(e); 155 } 156 return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName()); 157 } 158 159 private void updateMetaLocation(String path, ZNodeOpType opType) { 160 if (!isValidMetaZNode(path)) { 161 return; 162 } 163 LOG.debug("Updating meta znode for path {}: {}", path, opType.name()); 164 int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path); 165 RetryCounter retryCounter = retryCounterFactory.create(); 166 HRegionLocation location = null; 167 while (retryCounter.shouldRetry()) { 168 try { 169 if (opType == ZNodeOpType.DELETED) { 170 if (!ZKUtil.watchAndCheckExists(watcher, path)) { 171 // The path does not exist, we've set the watcher and we can break for now. 172 break; 173 } 174 // If it is a transient error and the node appears right away, we fetch the 175 // latest meta state. 176 } 177 location = getMetaRegionLocation(replicaId); 178 break; 179 } catch (KeeperException e) { 180 LOG.debug("Error getting meta location for path {}", path, e); 181 if (!retryCounter.shouldRetry()) { 182 LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e); 183 break; 184 } 185 try { 186 retryCounter.sleepUntilNextRetry(); 187 } catch (InterruptedException ie) { 188 Thread.currentThread().interrupt(); 189 return; 190 } 191 } 192 } 193 if (location == null) { 194 cachedMetaLocations.remove(replicaId); 195 return; 196 } 197 cachedMetaLocations.put(replicaId, location); 198 } 199 200 /** 201 * @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty. 202 * 203 */ 204 public Optional<List<HRegionLocation>> getMetaRegionLocations() { 205 ConcurrentNavigableMap<Integer, HRegionLocation> snapshot = 206 cachedMetaLocations.tailMap(cachedMetaLocations.firstKey()); 207 if (snapshot.isEmpty()) { 208 // This could be possible if the master has not successfully initialized yet or meta region 209 // is stuck in some weird state. 210 return Optional.empty(); 211 } 212 List<HRegionLocation> result = new ArrayList<>(); 213 // Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying 214 // ArrayValueCollection does not implement toArray(). 215 snapshot.values().forEach(location -> result.add(location)); 216 return Optional.of(result); 217 } 218 219 /** 220 * Helper to check if the given 'path' corresponds to a meta znode. This listener is only 221 * interested in changes to meta znodes. 222 */ 223 private boolean isValidMetaZNode(String path) { 224 return watcher.getZNodePaths().isAnyMetaReplicaZNode(path); 225 } 226 227 @Override 228 public void nodeCreated(String path) { 229 updateMetaLocation(path, ZNodeOpType.CREATED); 230 } 231 232 @Override 233 public void nodeDeleted(String path) { 234 updateMetaLocation(path, ZNodeOpType.DELETED); 235 } 236 237 @Override 238 public void nodeDataChanged(String path) { 239 updateMetaLocation(path, ZNodeOpType.CHANGED); 240 } 241 242 @Override 243 public void nodeChildrenChanged(String path) { 244 if (!path.equals(watcher.getZNodePaths().baseZNode)) { 245 return; 246 } 247 loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED); 248 } 249}