001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
021import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
022import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
023import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
024import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
025
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.RegionLocations;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.yetus.audience.InterfaceAudience;
032
033/**
034 * The asynchronous locator for meta region.
035 */
036@InterfaceAudience.Private
037class AsyncMetaRegionLocator {
038
039  private final ConnectionRegistry registry;
040
041  private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
042
043  private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
044    new AtomicReference<>();
045
046  AsyncMetaRegionLocator(ConnectionRegistry registry) {
047    this.registry = registry;
048  }
049
050  /**
051   * Get the region locations for meta region. If the location for the given replica is not
052   * available in the cached locations, then fetch from the HBase cluster.
053   * <p/>
054   * The <code>replicaId</code> parameter is important. If the region replication config for meta
055   * region is changed, then the cached region locations may not have the locations for new
056   * replicas. If we do not check the location for the given replica, we will always return the
057   * cached region locations and cause an infinite loop.
058   */
059  CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
060    return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
061      registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
062  }
063
064  private HRegionLocation getCacheLocation(HRegionLocation loc) {
065    RegionLocations locs = metaRegionLocations.get();
066    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
067  }
068
069  private void addLocationToCache(HRegionLocation loc) {
070    for (;;) {
071      int replicaId = loc.getRegion().getReplicaId();
072      RegionLocations oldLocs = metaRegionLocations.get();
073      if (oldLocs == null) {
074        RegionLocations newLocs = createRegionLocations(loc);
075        if (metaRegionLocations.compareAndSet(null, newLocs)) {
076          return;
077        }
078      }
079      HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId);
080      if (
081        oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum()
082          || oldLoc.getServerName().equals(loc.getServerName()))
083      ) {
084        return;
085      }
086      RegionLocations newLocs = replaceRegionLocation(oldLocs, loc);
087      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
088        return;
089      }
090    }
091  }
092
093  private void removeLocationFromCache(HRegionLocation loc) {
094    for (;;) {
095      RegionLocations oldLocs = metaRegionLocations.get();
096      if (oldLocs == null) {
097        return;
098      }
099      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
100      if (!canUpdateOnError(loc, oldLoc)) {
101        return;
102      }
103      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
104      if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) {
105        return;
106      }
107    }
108  }
109
110  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
111    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
112      this::addLocationToCache, this::removeLocationFromCache, null);
113  }
114
115  void clearCache() {
116    metaRegionLocations.set(null);
117  }
118
119  void clearCache(ServerName serverName) {
120    for (;;) {
121      RegionLocations locs = metaRegionLocations.get();
122      if (locs == null) {
123        return;
124      }
125      RegionLocations newLocs = locs.removeByServer(serverName);
126      if (locs == newLocs) {
127        return;
128      }
129      if (newLocs.isEmpty()) {
130        newLocs = null;
131      }
132      if (metaRegionLocations.compareAndSet(locs, newLocs)) {
133        return;
134      }
135    }
136  }
137
138  // only used for testing whether we have cached the location for a region.
139  RegionLocations getRegionLocationInCache() {
140    return metaRegionLocations.get();
141  }
142
143  // only used for testing whether we have cached the location for a table.
144  int getNumberOfCachedRegionLocations() {
145    RegionLocations locs = metaRegionLocations.get();
146    return locs != null ? locs.numNonNullElements() : 0;
147  }
148}