001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Optional;
023import java.util.concurrent.ConcurrentNavigableMap;
024import java.util.concurrent.ThreadFactory;
025import org.apache.hadoop.hbase.exceptions.DeserializationException;
026import org.apache.hadoop.hbase.master.RegionState;
027import org.apache.hadoop.hbase.trace.TraceUtil;
028import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
029import org.apache.hadoop.hbase.util.RetryCounter;
030import org.apache.hadoop.hbase.util.RetryCounterFactory;
031import org.apache.hadoop.hbase.zookeeper.ZKListener;
032import org.apache.hadoop.hbase.zookeeper.ZKUtil;
033import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
034import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.zookeeper.KeeperException;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
041
042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
043
044/**
045 * A cache of meta region location metadata. Registers a listener on ZK to track changes to the meta
046 * table znodes. Clients are expected to retry if the meta information is stale. This class is
047 * thread-safe (a single instance of this class can be shared by multiple threads without race
048 * conditions).
049 */
050@InterfaceAudience.Private
051public class MetaRegionLocationCache extends ZKListener {
052
053  private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class);
054
055  /**
056   * Maximum number of times we retry when ZK operation times out.
057   */
058  private static final int MAX_ZK_META_FETCH_RETRIES = 10;
059  /**
060   * Sleep interval ms between ZK operation retries.
061   */
062  private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000;
063  private static final int SLEEP_INTERVAL_MS_MAX = 10000;
064  private final RetryCounterFactory retryCounterFactory =
065    new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES);
066
067  /**
068   * Cached meta region locations indexed by replica ID. CopyOnWriteArrayMap ensures synchronization
069   * during updates and a consistent snapshot during client requests. Even though
070   * CopyOnWriteArrayMap copies the data structure for every write, that should be OK since the size
071   * of the list is often small and mutations are not too often and we do not need to block client
072   * requests while mutations are in progress.
073   */
074  private final CopyOnWriteArrayMap<Integer, HRegionLocation> cachedMetaLocations;
075
076  private enum ZNodeOpType {
077    INIT,
078    CREATED,
079    CHANGED,
080    DELETED
081  }
082
083  public MetaRegionLocationCache(ZKWatcher zkWatcher) {
084    super(zkWatcher);
085    cachedMetaLocations = new CopyOnWriteArrayMap<>();
086    watcher.registerListener(this);
087    // Populate the initial snapshot of data from meta znodes.
088    // This is needed because stand-by masters can potentially start after the initial znode
089    // creation. It blocks forever until the initial meta locations are loaded from ZK and watchers
090    // are established. Subsequent updates are handled by the registered listener. Also, this runs
091    // in a separate thread in the background to not block master init.
092    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
093    RetryCounterFactory retryFactory = new RetryCounterFactory(Integer.MAX_VALUE,
094      SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
095    threadFactory.newThread(() -> loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT))
096      .start();
097  }
098
099  /**
100   * Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers
101   * a watcher on base znode to check for any CREATE/DELETE events on the children.
102   * @param retryCounter controls the number of retries and sleep between retries.
103   */
104  private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
105    TraceUtil.trace(() -> {
106      List<String> znodes = null;
107      while (retryCounter.shouldRetry()) {
108        try {
109          znodes = watcher.getMetaReplicaNodesAndWatchChildren();
110          break;
111        } catch (KeeperException ke) {
112          LOG.debug("Error populating initial meta locations", ke);
113          if (!retryCounter.shouldRetry()) {
114            // Retries exhausted and watchers not set. This is not a desirable state since the cache
115            // could remain stale forever. Propagate the exception.
116            watcher.abort("Error populating meta locations", ke);
117            return;
118          }
119          try {
120            retryCounter.sleepUntilNextRetry();
121          } catch (InterruptedException ie) {
122            LOG.error("Interrupted while loading meta locations from ZK", ie);
123            Thread.currentThread().interrupt();
124            return;
125          }
126        }
127      }
128      if (znodes == null || znodes.isEmpty()) {
129        // No meta znodes exist at this point but we registered a watcher on the base znode to
130        // listen for updates. They will be handled via nodeChildrenChanged().
131        return;
132      }
133      if (znodes.size() == cachedMetaLocations.size()) {
134        // No new meta znodes got added.
135        return;
136      }
137      for (String znode : znodes) {
138        String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
139        updateMetaLocation(path, opType);
140      }
141    }, "MetaRegionLocationCache.loadMetaLocationsFromZk");
142  }
143
144  /**
145   * Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for future
146   * updates.
147   * @param replicaId ReplicaID of the region.
148   * @return HRegionLocation for the meta replica.
149   * @throws KeeperException if there is any issue fetching/parsing the serialized data.
150   */
151  private HRegionLocation getMetaRegionLocation(int replicaId) throws KeeperException {
152    RegionState metaRegionState;
153    try {
154      byte[] data =
155        ZKUtil.getDataAndWatch(watcher, watcher.getZNodePaths().getZNodeForReplica(replicaId));
156      metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
157    } catch (DeserializationException e) {
158      throw ZKUtil.convert(e);
159    }
160    return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName());
161  }
162
163  private void updateMetaLocation(String path, ZNodeOpType opType) {
164    if (!isValidMetaPath(path)) {
165      return;
166    }
167    LOG.debug("Updating meta znode for path {}: {}", path, opType.name());
168    int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path);
169    RetryCounter retryCounter = retryCounterFactory.create();
170    HRegionLocation location = null;
171    while (retryCounter.shouldRetry()) {
172      try {
173        if (opType == ZNodeOpType.DELETED) {
174          if (!ZKUtil.watchAndCheckExists(watcher, path)) {
175            // The path does not exist, we've set the watcher and we can break for now.
176            break;
177          }
178          // If it is a transient error and the node appears right away, we fetch the
179          // latest meta state.
180        }
181        location = getMetaRegionLocation(replicaId);
182        break;
183      } catch (KeeperException e) {
184        LOG.debug("Error getting meta location for path {}", path, e);
185        if (!retryCounter.shouldRetry()) {
186          LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e);
187          break;
188        }
189        try {
190          retryCounter.sleepUntilNextRetry();
191        } catch (InterruptedException ie) {
192          Thread.currentThread().interrupt();
193          return;
194        }
195      }
196    }
197    if (location == null) {
198      cachedMetaLocations.remove(replicaId);
199      return;
200    }
201    cachedMetaLocations.put(replicaId, location);
202  }
203
204  /** Returns Optional list of HRegionLocations for meta replica(s), null if the cache is empty. */
205  public Optional<List<HRegionLocation>> getMetaRegionLocations() {
206    ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
207      cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
208    if (snapshot.isEmpty()) {
209      // This could be possible if the master has not successfully initialized yet or meta region
210      // is stuck in some weird state.
211      return Optional.empty();
212    }
213    List<HRegionLocation> result = new ArrayList<>();
214    // Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
215    // ArrayValueCollection does not implement toArray().
216    snapshot.values().forEach(location -> result.add(location));
217    return Optional.of(result);
218  }
219
220  /**
221   * Helper to check if the given 'path' corresponds to a meta znode. This listener is only
222   * interested in changes to meta znodes.
223   */
224  private boolean isValidMetaPath(String path) {
225    return watcher.getZNodePaths().isMetaZNodePath(path);
226  }
227
228  @Override
229  public void nodeCreated(String path) {
230    updateMetaLocation(path, ZNodeOpType.CREATED);
231  }
232
233  @Override
234  public void nodeDeleted(String path) {
235    updateMetaLocation(path, ZNodeOpType.DELETED);
236  }
237
238  @Override
239  public void nodeDataChanged(String path) {
240    updateMetaLocation(path, ZNodeOpType.CHANGED);
241  }
242
243  @Override
244  public void nodeChildrenChanged(String path) {
245    if (!path.equals(watcher.getZNodePaths().baseZNode)) {
246      return;
247    }
248    loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED);
249  }
250}