001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Optional;
023import java.util.concurrent.ConcurrentNavigableMap;
024import java.util.concurrent.ThreadFactory;
025import org.apache.hadoop.hbase.HRegionLocation;
026import org.apache.hadoop.hbase.exceptions.DeserializationException;
027import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
028import org.apache.hadoop.hbase.util.RetryCounter;
029import org.apache.hadoop.hbase.util.RetryCounterFactory;
030import org.apache.hadoop.hbase.zookeeper.ZKListener;
031import org.apache.hadoop.hbase.zookeeper.ZKUtil;
032import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
033import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.zookeeper.KeeperException;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
040
041/**
042 * A cache of meta region location metadata. Registers a listener on ZK to track changes to the
043 * meta table znodes. Clients are expected to retry if the meta information is stale. This class
044 * is thread-safe (a single instance of this class can be shared by multiple threads without race
045 * conditions).
046 */
047@InterfaceAudience.Private
048public class MetaRegionLocationCache extends ZKListener {
049
050  private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class);
051
052  /**
053   * Maximum number of times we retry when ZK operation times out.
054   */
055  private static final int MAX_ZK_META_FETCH_RETRIES = 10;
056  /**
057   * Sleep interval ms between ZK operation retries.
058   */
059  private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000;
060  private static final int SLEEP_INTERVAL_MS_MAX = 10000;
061  private final RetryCounterFactory retryCounterFactory =
062      new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES);
063
064  /**
065   * Cached meta region locations indexed by replica ID.
066   * CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during
067   * client requests. Even though CopyOnWriteArrayMap copies the data structure for every write,
068   * that should be OK since the size of the list is often small and mutations are not too often
069   * and we do not need to block client requests while mutations are in progress.
070   */
071  private final CopyOnWriteArrayMap<Integer, HRegionLocation> cachedMetaLocations;
072
073  private enum ZNodeOpType {
074    INIT,
075    CREATED,
076    CHANGED,
077    DELETED
078  }
079
080  public MetaRegionLocationCache(ZKWatcher zkWatcher) {
081    super(zkWatcher);
082    cachedMetaLocations = new CopyOnWriteArrayMap<>();
083    watcher.registerListener(this);
084    // Populate the initial snapshot of data from meta znodes.
085    // This is needed because stand-by masters can potentially start after the initial znode
086    // creation. It blocks forever until the initial meta locations are loaded from ZK and watchers
087    // are established. Subsequent updates are handled by the registered listener. Also, this runs
088    // in a separate thread in the background to not block master init.
089    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
090    RetryCounterFactory retryFactory = new RetryCounterFactory(
091        Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
092    threadFactory.newThread(
093      ()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
094  }
095
096  /**
097   * Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers
098   * a watcher on base znode to check for any CREATE/DELETE events on the children.
099   * @param retryCounter controls the number of retries and sleep between retries.
100   */
101  private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
102    List<String> znodes = null;
103    while (retryCounter.shouldRetry()) {
104      try {
105        znodes = watcher.getMetaReplicaNodesAndWatchChildren();
106        break;
107      } catch (KeeperException ke) {
108        LOG.debug("Error populating initial meta locations", ke);
109        if (!retryCounter.shouldRetry()) {
110          // Retries exhausted and watchers not set. This is not a desirable state since the cache
111          // could remain stale forever. Propagate the exception.
112          watcher.abort("Error populating meta locations", ke);
113          return;
114        }
115        try {
116          retryCounter.sleepUntilNextRetry();
117        } catch (InterruptedException ie) {
118          LOG.error("Interrupted while loading meta locations from ZK", ie);
119          Thread.currentThread().interrupt();
120          return;
121        }
122      }
123    }
124    if (znodes == null || znodes.isEmpty()) {
125      // No meta znodes exist at this point but we registered a watcher on the base znode to listen
126      // for updates. They will be handled via nodeChildrenChanged().
127      return;
128    }
129    if (znodes.size() == cachedMetaLocations.size()) {
130      // No new meta znodes got added.
131      return;
132    }
133    for (String znode: znodes) {
134      String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
135      updateMetaLocation(path, opType);
136    }
137  }
138
139  /**
140   * Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for
141   * future updates.
142   * @param replicaId ReplicaID of the region.
143   * @return HRegionLocation for the meta replica.
144   * @throws KeeperException if there is any issue fetching/parsing the serialized data.
145   */
146  private HRegionLocation getMetaRegionLocation(int replicaId)
147      throws KeeperException {
148    RegionState metaRegionState;
149    try {
150      byte[] data = ZKUtil.getDataAndWatch(watcher,
151          watcher.getZNodePaths().getZNodeForReplica(replicaId));
152      metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
153    } catch (DeserializationException e) {
154      throw ZKUtil.convert(e);
155    }
156    return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName());
157  }
158
159  private void updateMetaLocation(String path, ZNodeOpType opType) {
160    if (!isValidMetaPath(path)) {
161      return;
162    }
163    LOG.debug("Updating meta znode for path {}: {}", path, opType.name());
164    int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path);
165    RetryCounter retryCounter = retryCounterFactory.create();
166    HRegionLocation location = null;
167    while (retryCounter.shouldRetry()) {
168      try {
169        if (opType == ZNodeOpType.DELETED) {
170          if (!ZKUtil.watchAndCheckExists(watcher, path)) {
171            // The path does not exist, we've set the watcher and we can break for now.
172            break;
173          }
174          // If it is a transient error and the node appears right away, we fetch the
175          // latest meta state.
176        }
177        location = getMetaRegionLocation(replicaId);
178        break;
179      } catch (KeeperException e) {
180        LOG.debug("Error getting meta location for path {}", path, e);
181        if (!retryCounter.shouldRetry()) {
182          LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e);
183          break;
184        }
185        try {
186          retryCounter.sleepUntilNextRetry();
187        } catch (InterruptedException ie) {
188          Thread.currentThread().interrupt();
189          return;
190        }
191      }
192    }
193    if (location == null) {
194      cachedMetaLocations.remove(replicaId);
195      return;
196    }
197    cachedMetaLocations.put(replicaId, location);
198  }
199
200  /**
201   * @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
202   *
203   */
204  public Optional<List<HRegionLocation>> getMetaRegionLocations() {
205    ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
206        cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
207    if (snapshot.isEmpty()) {
208      // This could be possible if the master has not successfully initialized yet or meta region
209      // is stuck in some weird state.
210      return Optional.empty();
211    }
212    List<HRegionLocation> result = new ArrayList<>();
213    // Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
214    // ArrayValueCollection does not implement toArray().
215    snapshot.values().forEach(location -> result.add(location));
216    return Optional.of(result);
217  }
218
219  /**
220   * Helper to check if the given 'path' corresponds to a meta znode. This listener is only
221   * interested in changes to meta znodes.
222   */
223  private boolean isValidMetaPath(String path) {
224    return watcher.getZNodePaths().isMetaZNodePath(path);
225  }
226
227  @Override
228  public void nodeCreated(String path) {
229    updateMetaLocation(path, ZNodeOpType.CREATED);
230  }
231
232  @Override
233  public void nodeDeleted(String path) {
234    updateMetaLocation(path, ZNodeOpType.DELETED);
235  }
236
237  @Override
238  public void nodeDataChanged(String path) {
239    updateMetaLocation(path, ZNodeOpType.CHANGED);
240  }
241
242  @Override
243  public void nodeChildrenChanged(String path) {
244    if (!path.equals(watcher.getZNodePaths().baseZNode)) {
245      return;
246    }
247    loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED);
248  }
249}