001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
022import static org.apache.hadoop.hbase.HConstants.NINES;
023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
024import static org.apache.hadoop.hbase.HConstants.ZEROES;
025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
028import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
029import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
030import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
031import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
032import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
033import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
034import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
035import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
036
037import java.io.IOException;
038import java.util.Arrays;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.Map;
043import java.util.Optional;
044import java.util.Set;
045import java.util.concurrent.CompletableFuture;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.ConcurrentMap;
048import java.util.concurrent.ConcurrentNavigableMap;
049import java.util.concurrent.ConcurrentSkipListMap;
050import java.util.concurrent.TimeUnit;
051import org.apache.commons.lang3.ObjectUtils;
052import org.apache.hadoop.hbase.HBaseIOException;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionLocation;
055import org.apache.hadoop.hbase.MetaTableAccessor;
056import org.apache.hadoop.hbase.RegionLocations;
057import org.apache.hadoop.hbase.ServerName;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.TableNotFoundException;
060import org.apache.hadoop.hbase.client.Scan.ReadType;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hbase.thirdparty.com.google.common.base.Objects;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * The asynchronous locator for regions other than meta.
069 */
070@InterfaceAudience.Private
071class AsyncNonMetaRegionLocator {
072
073  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
074
075  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
076    "hbase.client.meta.max.concurrent.locate.per.table";
077
078  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
079
080  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
081
082  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
083
084  private final AsyncConnectionImpl conn;
085
086  private final int maxConcurrentLocateRequestPerTable;
087
088  private final int locatePrefetchLimit;
089
090  // The mode tells if HedgedRead, LoadBalance mode is supported.
091  // The default mode is CatalogReplicaMode.None.
092  private CatalogReplicaMode metaReplicaMode;
093  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
094
095  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
096
097  private static final class LocateRequest {
098
099    private final byte[] row;
100
101    private final RegionLocateType locateType;
102
103    public LocateRequest(byte[] row, RegionLocateType locateType) {
104      this.row = row;
105      this.locateType = locateType;
106    }
107
108    @Override
109    public int hashCode() {
110      return Bytes.hashCode(row) ^ locateType.hashCode();
111    }
112
113    @Override
114    public boolean equals(Object obj) {
115      if (obj == null || obj.getClass() != LocateRequest.class) {
116        return false;
117      }
118      LocateRequest that = (LocateRequest) obj;
119      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
120    }
121  }
122
123  private static final class TableCache {
124
125    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
126      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
127
128    private final Set<LocateRequest> pendingRequests = new HashSet<>();
129
130    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
131      new LinkedHashMap<>();
132
133    public boolean hasQuota(int max) {
134      return pendingRequests.size() < max;
135    }
136
137    public boolean isPending(LocateRequest req) {
138      return pendingRequests.contains(req);
139    }
140
141    public void send(LocateRequest req) {
142      pendingRequests.add(req);
143    }
144
145    public Optional<LocateRequest> getCandidate() {
146      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
147    }
148
149    public void clearCompletedRequests(RegionLocations locations) {
150      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
151        allRequests.entrySet().iterator(); iter.hasNext();) {
152        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
153        if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
154          iter.remove();
155        }
156      }
157    }
158
159    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
160        RegionLocations locations) {
161      if (future.isDone()) {
162        return true;
163      }
164      if (locations == null) {
165        return false;
166      }
167      HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
168      // we should at least have one location available, otherwise the request should fail and
169      // should not arrive here
170      assert loc != null;
171      boolean completed;
172      if (req.locateType.equals(RegionLocateType.BEFORE)) {
173        // for locating the row before current row, the common case is to find the previous region
174        // in reverse scan, so we check the endKey first. In general, the condition should be
175        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
176        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
177        // endKey.
178        byte[] endKey = loc.getRegion().getEndKey();
179        int c = Bytes.compareTo(endKey, req.row);
180        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) &&
181          Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
182      } else {
183        completed = loc.getRegion().containsRow(req.row);
184      }
185      if (completed) {
186        future.complete(locations);
187        return true;
188      } else {
189        return false;
190      }
191    }
192  }
193
194  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
195    this.conn = conn;
196    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
197      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
198    this.locatePrefetchLimit =
199      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
200
201    // Get the region locator's meta replica mode.
202    this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
203      .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
204
205    switch (this.metaReplicaMode) {
206      case LOAD_BALANCE:
207        String replicaSelectorClass = conn.getConfiguration().
208          get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
209          CatalogReplicaLoadBalanceSimpleSelector.class.getName());
210
211        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
212          replicaSelectorClass, META_TABLE_NAME, conn.getChoreService(), () -> {
213            int numOfReplicas = 1;
214            try {
215              RegionLocations metaLocations = conn.registry.getMetaRegionLocations().get(
216                conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
217              numOfReplicas = metaLocations.size();
218            } catch (Exception e) {
219              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
220            }
221            return numOfReplicas;
222          });
223        break;
224      case NONE:
225        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
226
227        boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
228          DEFAULT_USE_META_REPLICAS);
229        if (useMetaReplicas) {
230          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
231        }
232        break;
233      default:
234        // Doing nothing
235    }
236  }
237
238  private TableCache getTableCache(TableName tableName) {
239    return computeIfAbsent(cache, tableName, TableCache::new);
240  }
241
242  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
243    HRegionLocation[] locArr1 = locs1.getRegionLocations();
244    HRegionLocation[] locArr2 = locs2.getRegionLocations();
245    if (locArr1.length != locArr2.length) {
246      return false;
247    }
248    for (int i = 0; i < locArr1.length; i++) {
249      // do not need to compare region info
250      HRegionLocation loc1 = locArr1[i];
251      HRegionLocation loc2 = locArr2[i];
252      if (loc1 == null) {
253        if (loc2 != null) {
254          return false;
255        }
256      } else {
257        if (loc2 == null) {
258          return false;
259        }
260        if (loc1.getSeqNum() != loc2.getSeqNum()) {
261          return false;
262        }
263        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
264          return false;
265        }
266      }
267    }
268    return true;
269  }
270
271  // if we successfully add the locations to cache, return the locations, otherwise return the one
272  // which prevents us being added. The upper layer can use this value to complete pending requests.
273  private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
274    LOG.trace("Try adding {} to cache", locs);
275    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
276    for (;;) {
277      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
278      if (oldLocs == null) {
279        return locs;
280      }
281      // check whether the regions are the same, this usually happens when table is split/merged, or
282      // deleted and recreated again.
283      RegionInfo region = locs.getRegionLocation().getRegion();
284      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
285      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
286        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
287        if (isEqual(mergedLocs, oldLocs)) {
288          // the merged one is the same with the old one, give up
289          LOG.trace("Will not add {} to cache because the old value {} " +
290            " is newer than us or has the same server name." +
291            " Maybe it is updated before we replace it", locs, oldLocs);
292          return oldLocs;
293        }
294        if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
295          return mergedLocs;
296        }
297      } else {
298        // the region is different, here we trust the one we fetched. This maybe wrong but finally
299        // the upper layer can detect this and trigger removal of the wrong locations
300        if (LOG.isDebugEnabled()) {
301          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," +
302            " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
303        }
304        if (tableCache.cache.replace(startKey, oldLocs, locs)) {
305          return locs;
306        }
307      }
308    }
309  }
310
311  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
312      Throwable error) {
313    if (error != null) {
314      LOG.warn("Failed to locate region in '" + tableName + "', row='" +
315        Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
316    }
317    Optional<LocateRequest> toSend = Optional.empty();
318    TableCache tableCache = getTableCache(tableName);
319    if (locs != null) {
320      RegionLocations addedLocs = addToCache(tableCache, locs);
321      synchronized (tableCache) {
322        tableCache.pendingRequests.remove(req);
323        tableCache.clearCompletedRequests(addedLocs);
324        // Remove a complete locate request in a synchronized block, so the table cache must have
325        // quota to send a candidate request.
326        toSend = tableCache.getCandidate();
327        toSend.ifPresent(r -> tableCache.send(r));
328      }
329      toSend.ifPresent(r -> locateInMeta(tableName, r));
330    } else {
331      // we meet an error
332      assert error != null;
333      synchronized (tableCache) {
334        tableCache.pendingRequests.remove(req);
335        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
336        // already retried several times
337        CompletableFuture<?> future = tableCache.allRequests.remove(req);
338        if (future != null) {
339          future.completeExceptionally(error);
340        }
341        tableCache.clearCompletedRequests(null);
342        // Remove a complete locate request in a synchronized block, so the table cache must have
343        // quota to send a candidate request.
344        toSend = tableCache.getCandidate();
345        toSend.ifPresent(r -> tableCache.send(r));
346      }
347      toSend.ifPresent(r -> locateInMeta(tableName, r));
348    }
349  }
350
351  // return whether we should stop the scan
352  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
353    RegionLocations locs = MetaTableAccessor.getRegionLocations(result);
354    if (LOG.isDebugEnabled()) {
355      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
356        Bytes.toStringBinary(req.row), req.locateType, locs);
357    }
358    // remove HRegionLocation with null location, i.e, getServerName returns null.
359    if (locs != null) {
360      locs = locs.removeElementsWithNullLocation();
361    }
362
363    // the default region location should always be presented when fetching from meta, otherwise
364    // let's fail the request.
365    if (locs == null || locs.getDefaultRegionLocation() == null) {
366      complete(tableName, req, null,
367        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
368          tableName, Bytes.toStringBinary(req.row), req.locateType)));
369      return true;
370    }
371    HRegionLocation loc = locs.getDefaultRegionLocation();
372    RegionInfo info = loc.getRegion();
373    if (info == null) {
374      complete(tableName, req, null,
375        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
376          tableName, Bytes.toStringBinary(req.row), req.locateType)));
377      return true;
378    }
379    if (info.isSplitParent()) {
380      return false;
381    }
382    complete(tableName, req, locs, null);
383    return true;
384  }
385
386  private void recordCacheHit() {
387    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
388  }
389
390  private void recordCacheMiss() {
391    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
392  }
393
394  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
395      int replicaId) {
396    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
397    if (entry == null) {
398      recordCacheMiss();
399      return null;
400    }
401    RegionLocations locs = entry.getValue();
402    HRegionLocation loc = locs.getRegionLocation(replicaId);
403    if (loc == null) {
404      recordCacheMiss();
405      return null;
406    }
407    byte[] endKey = loc.getRegion().getEndKey();
408    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
409      if (LOG.isTraceEnabled()) {
410        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
411          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
412      }
413      recordCacheHit();
414      return locs;
415    } else {
416      recordCacheMiss();
417      return null;
418    }
419  }
420
421  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
422      byte[] row, int replicaId) {
423    boolean isEmptyStopRow = isEmptyStopRow(row);
424    Map.Entry<byte[], RegionLocations> entry =
425      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
426    if (entry == null) {
427      recordCacheMiss();
428      return null;
429    }
430    RegionLocations locs = entry.getValue();
431    HRegionLocation loc = locs.getRegionLocation(replicaId);
432    if (loc == null) {
433      recordCacheMiss();
434      return null;
435    }
436    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
437      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
438      if (LOG.isTraceEnabled()) {
439        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
440          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
441      }
442      recordCacheHit();
443      return locs;
444    } else {
445      recordCacheMiss();
446      return null;
447    }
448  }
449
450  private void locateInMeta(TableName tableName, LocateRequest req) {
451    if (LOG.isTraceEnabled()) {
452      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) +
453        "', locateType=" + req.locateType + " in meta");
454    }
455    byte[] metaStartKey;
456    if (req.locateType.equals(RegionLocateType.BEFORE)) {
457      if (isEmptyStopRow(req.row)) {
458        byte[] binaryTableName = tableName.getName();
459        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
460      } else {
461        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
462      }
463    } else {
464      metaStartKey = createRegionName(tableName, req.row, NINES, false);
465    }
466    byte[] metaStopKey =
467      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
468    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
469      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
470      .setReadType(ReadType.PREAD);
471
472    switch (this.metaReplicaMode) {
473      case LOAD_BALANCE:
474        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
475        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
476          // If the selector gives a non-primary meta replica region, then go with it.
477          // Otherwise, just go to primary in non-hedgedRead mode.
478          scan.setConsistency(Consistency.TIMELINE);
479          scan.setReplicaId(metaReplicaId);
480        }
481        break;
482      case HEDGED_READ:
483        scan.setConsistency(Consistency.TIMELINE);
484        break;
485      default:
486        // do nothing
487    }
488
489    conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
490
491      private boolean completeNormally = false;
492
493      private boolean tableNotFound = true;
494
495      @Override
496      public void onError(Throwable error) {
497        complete(tableName, req, null, error);
498      }
499
500      @Override
501      public void onComplete() {
502        if (tableNotFound) {
503          complete(tableName, req, null, new TableNotFoundException(tableName));
504        } else if (!completeNormally) {
505          complete(tableName, req, null, new IOException(
506            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
507        }
508      }
509
510      @Override
511      public void onNext(Result[] results, ScanController controller) {
512        if (results.length == 0) {
513          return;
514        }
515        tableNotFound = false;
516        int i = 0;
517        for (; i < results.length; i++) {
518          if (onScanNext(tableName, req, results[i])) {
519            completeNormally = true;
520            controller.terminate();
521            i++;
522            break;
523          }
524        }
525        // Add the remaining results into cache
526        if (i < results.length) {
527          TableCache tableCache = getTableCache(tableName);
528          for (; i < results.length; i++) {
529            RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
530            if (locs == null) {
531              continue;
532            }
533            HRegionLocation loc = locs.getDefaultRegionLocation();
534            if (loc == null) {
535              continue;
536            }
537            RegionInfo info = loc.getRegion();
538            if (info == null || info.isOffline() || info.isSplitParent()) {
539              continue;
540            }
541            RegionLocations addedLocs = addToCache(tableCache, locs);
542            synchronized (tableCache) {
543              tableCache.clearCompletedRequests(addedLocs);
544            }
545          }
546        }
547      }
548    });
549  }
550
551  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
552      int replicaId, RegionLocateType locateType) {
553    return locateType.equals(RegionLocateType.BEFORE)
554      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
555      : locateRowInCache(tableCache, tableName, row, replicaId);
556  }
557
558  // locateToPrevious is true means we will use the start key of a region to locate the region
559  // placed before it. Used for reverse scan. See the comment of
560  // AsyncRegionLocator.getPreviousRegionLocation.
561  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
562      byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
563    // AFTER should be convert to CURRENT before calling this method
564    assert !locateType.equals(RegionLocateType.AFTER);
565    TableCache tableCache = getTableCache(tableName);
566    if (!reload) {
567      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
568      if (isGood(locs, replicaId)) {
569        return CompletableFuture.completedFuture(locs);
570      }
571    }
572    CompletableFuture<RegionLocations> future;
573    LocateRequest req;
574    boolean sendRequest = false;
575    synchronized (tableCache) {
576      // check again
577      if (!reload) {
578        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
579        if (isGood(locs, replicaId)) {
580          return CompletableFuture.completedFuture(locs);
581        }
582      }
583      req = new LocateRequest(row, locateType);
584      future = tableCache.allRequests.get(req);
585      if (future == null) {
586        future = new CompletableFuture<>();
587        tableCache.allRequests.put(req, future);
588        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
589          tableCache.send(req);
590          sendRequest = true;
591        }
592      }
593    }
594    if (sendRequest) {
595      locateInMeta(tableName, req);
596    }
597    return future;
598  }
599
600  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
601      int replicaId, RegionLocateType locateType, boolean reload) {
602    // as we know the exact row after us, so we can just create the new row, and use the same
603    // algorithm to locate it.
604    if (locateType.equals(RegionLocateType.AFTER)) {
605      row = createClosestRowAfter(row);
606      locateType = RegionLocateType.CURRENT;
607    }
608    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
609  }
610
611  private void recordClearRegionCache() {
612    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
613  }
614
615  private void removeLocationFromCache(HRegionLocation loc) {
616    TableCache tableCache = cache.get(loc.getRegion().getTable());
617    if (tableCache == null) {
618      return;
619    }
620    byte[] startKey = loc.getRegion().getStartKey();
621    for (;;) {
622      RegionLocations oldLocs = tableCache.cache.get(startKey);
623      if (oldLocs == null) {
624        return;
625      }
626      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
627      if (!canUpdateOnError(loc, oldLoc)) {
628        return;
629      }
630      // Tell metaReplicaSelector that the location is stale. It will create a stale entry
631      // with timestamp internally. Next time the client looks up the same location,
632      // it will pick a different meta replica region.
633      if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
634        metaReplicaSelector.onError(loc);
635      }
636
637      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
638      if (newLocs == null) {
639        if (tableCache.cache.remove(startKey, oldLocs)) {
640          recordClearRegionCache();
641          return;
642        }
643      } else {
644        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
645          recordClearRegionCache();
646          return;
647        }
648      }
649    }
650  }
651
652  private void addLocationToCache(HRegionLocation loc) {
653    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
654  }
655
656  private HRegionLocation getCachedLocation(HRegionLocation loc) {
657    TableCache tableCache = cache.get(loc.getRegion().getTable());
658    if (tableCache == null) {
659      return null;
660    }
661    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
662    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
663  }
664
665  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
666    Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
667    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
668      this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
669  }
670
671  void clearCache(TableName tableName) {
672    TableCache tableCache = cache.remove(tableName);
673    if (tableCache == null) {
674      return;
675    }
676    synchronized (tableCache) {
677      if (!tableCache.allRequests.isEmpty()) {
678        IOException error = new IOException("Cache cleared");
679        tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
680      }
681    }
682    conn.getConnectionMetrics()
683      .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
684  }
685
686  void clearCache() {
687    cache.clear();
688  }
689
690  void clearCache(ServerName serverName) {
691    for (TableCache tableCache : cache.values()) {
692      for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
693        byte[] regionName = entry.getKey();
694        RegionLocations locs = entry.getValue();
695        RegionLocations newLocs = locs.removeByServer(serverName);
696        if (locs == newLocs) {
697          continue;
698        }
699        if (newLocs.isEmpty()) {
700          tableCache.cache.remove(regionName, locs);
701        } else {
702          tableCache.cache.replace(regionName, locs, newLocs);
703        }
704      }
705    }
706  }
707
708  // only used for testing whether we have cached the location for a region.
709  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
710    TableCache tableCache = cache.get(tableName);
711    if (tableCache == null) {
712      return null;
713    }
714    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
715  }
716}