001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
022import static org.apache.hadoop.hbase.HConstants.NINES;
023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
024import static org.apache.hadoop.hbase.HConstants.ZEROES;
025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
028import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
029import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
030import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
031import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
032import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
033import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
034import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
035import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
036
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.HashSet;
041import java.util.Iterator;
042import java.util.LinkedHashMap;
043import java.util.List;
044import java.util.Map;
045import java.util.Optional;
046import java.util.Set;
047import java.util.concurrent.CompletableFuture;
048import java.util.concurrent.ConcurrentHashMap;
049import java.util.concurrent.ConcurrentMap;
050import java.util.concurrent.ConcurrentNavigableMap;
051import java.util.concurrent.ConcurrentSkipListMap;
052import java.util.concurrent.TimeUnit;
053import org.apache.commons.lang3.ObjectUtils;
054import org.apache.hadoop.hbase.CatalogFamilyFormat;
055import org.apache.hadoop.hbase.CatalogReplicaMode;
056import org.apache.hadoop.hbase.HBaseIOException;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.ServerName;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.TableNotFoundException;
063import org.apache.hadoop.hbase.client.Scan.ReadType;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.base.Objects;
070
071/**
072 * The asynchronous locator for regions other than meta.
073 */
074@InterfaceAudience.Private
075class AsyncNonMetaRegionLocator {
076
077  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
078
079  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
080    "hbase.client.meta.max.concurrent.locate.per.table";
081
082  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
083
084  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
085
086  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
087
088  private final AsyncConnectionImpl conn;
089
090  private final int maxConcurrentLocateRequestPerTable;
091
092  private final int locatePrefetchLimit;
093
094  // The mode tells if HedgedRead, LoadBalance mode is supported.
095  // The default mode is CatalogReplicaMode.None.
096  private CatalogReplicaMode metaReplicaMode;
097  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
098
099  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
100
101  private static final class LocateRequest {
102
103    private final byte[] row;
104
105    private final RegionLocateType locateType;
106
107    public LocateRequest(byte[] row, RegionLocateType locateType) {
108      this.row = row;
109      this.locateType = locateType;
110    }
111
112    @Override
113    public int hashCode() {
114      return Bytes.hashCode(row) ^ locateType.hashCode();
115    }
116
117    @Override
118    public boolean equals(Object obj) {
119      if (obj == null || obj.getClass() != LocateRequest.class) {
120        return false;
121      }
122      LocateRequest that = (LocateRequest) obj;
123      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
124    }
125  }
126
127  private static final class RegionLocationsFutureResult {
128    private final CompletableFuture<RegionLocations> future;
129    private final RegionLocations result;
130    private final Throwable e;
131
132    public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
133      RegionLocations result, Throwable e) {
134      this.future = future;
135      this.result = result;
136      this.e = e;
137    }
138
139    public void complete() {
140      if (e != null) {
141        future.completeExceptionally(e);
142      }
143      future.complete(result);
144    }
145  }
146
147  private static final class TableCache {
148
149    private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
150      new ConcurrentSkipListMap<>(BYTES_COMPARATOR);
151
152    private final Set<LocateRequest> pendingRequests = new HashSet<>();
153
154    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
155      new LinkedHashMap<>();
156
157    public boolean hasQuota(int max) {
158      return pendingRequests.size() < max;
159    }
160
161    public boolean isPending(LocateRequest req) {
162      return pendingRequests.contains(req);
163    }
164
165    public void send(LocateRequest req) {
166      pendingRequests.add(req);
167    }
168
169    public Optional<LocateRequest> getCandidate() {
170      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
171    }
172
173    public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
174      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
175      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
176        allRequests.entrySet().iterator(); iter.hasNext();) {
177        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
178        if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
179          iter.remove();
180        }
181      }
182      return futureResultList;
183    }
184
185    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
186      RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
187      if (future.isDone()) {
188        return true;
189      }
190      if (locations == null) {
191        return false;
192      }
193      HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
194      // we should at least have one location available, otherwise the request should fail and
195      // should not arrive here
196      assert loc != null;
197      boolean completed;
198      if (req.locateType.equals(RegionLocateType.BEFORE)) {
199        // for locating the row before current row, the common case is to find the previous region
200        // in reverse scan, so we check the endKey first. In general, the condition should be
201        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
202        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
203        // endKey.
204        byte[] endKey = loc.getRegion().getEndKey();
205        int c = Bytes.compareTo(endKey, req.row);
206        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey))
207          && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
208      } else {
209        completed = loc.getRegion().containsRow(req.row);
210      }
211      if (completed) {
212        futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
213        return true;
214      } else {
215        return false;
216      }
217    }
218  }
219
220  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) {
221    this.conn = conn;
222    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
223      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
224    this.locatePrefetchLimit =
225      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
226
227    // Get the region locator's meta replica mode.
228    this.metaReplicaMode = CatalogReplicaMode.fromString(
229      conn.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
230
231    switch (this.metaReplicaMode) {
232      case LOAD_BALANCE:
233        String replicaSelectorClass =
234          conn.getConfiguration().get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
235            CatalogReplicaLoadBalanceSimpleSelector.class.getName());
236
237        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory
238          .createSelector(replicaSelectorClass, META_TABLE_NAME, conn, () -> {
239            int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
240            try {
241              RegionLocations metaLocations = conn.registry.getMetaRegionLocations()
242                .get(conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
243              numOfReplicas = metaLocations.size();
244            } catch (Exception e) {
245              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
246            }
247            return numOfReplicas;
248          });
249        break;
250      case NONE:
251        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
252        boolean useMetaReplicas =
253          conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
254        if (useMetaReplicas) {
255          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
256        }
257        break;
258      default:
259        // Doing nothing
260    }
261  }
262
263  private TableCache getTableCache(TableName tableName) {
264    return computeIfAbsent(cache, tableName, TableCache::new);
265  }
266
267  private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
268    HRegionLocation[] locArr1 = locs1.getRegionLocations();
269    HRegionLocation[] locArr2 = locs2.getRegionLocations();
270    if (locArr1.length != locArr2.length) {
271      return false;
272    }
273    for (int i = 0; i < locArr1.length; i++) {
274      // do not need to compare region info
275      HRegionLocation loc1 = locArr1[i];
276      HRegionLocation loc2 = locArr2[i];
277      if (loc1 == null) {
278        if (loc2 != null) {
279          return false;
280        }
281      } else {
282        if (loc2 == null) {
283          return false;
284        }
285        if (loc1.getSeqNum() != loc2.getSeqNum()) {
286          return false;
287        }
288        if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
289          return false;
290        }
291      }
292    }
293    return true;
294  }
295
296  // if we successfully add the locations to cache, return the locations, otherwise return the one
297  // which prevents us being added. The upper layer can use this value to complete pending requests.
298  private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
299    LOG.trace("Try adding {} to cache", locs);
300    byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
301    for (;;) {
302      RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
303      if (oldLocs == null) {
304        return locs;
305      }
306      // check whether the regions are the same, this usually happens when table is split/merged, or
307      // deleted and recreated again.
308      RegionInfo region = locs.getRegionLocation().getRegion();
309      RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
310      if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
311        RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
312        if (isEqual(mergedLocs, oldLocs)) {
313          // the merged one is the same with the old one, give up
314          LOG.trace("Will not add {} to cache because the old value {} "
315            + " is newer than us or has the same server name."
316            + " Maybe it is updated before we replace it", locs, oldLocs);
317          return oldLocs;
318        }
319        if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
320          return mergedLocs;
321        }
322      } else {
323        // the region is different, here we trust the one we fetched. This maybe wrong but finally
324        // the upper layer can detect this and trigger removal of the wrong locations
325        if (LOG.isDebugEnabled()) {
326          LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}',"
327            + " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
328        }
329        if (tableCache.cache.replace(startKey, oldLocs, locs)) {
330          return locs;
331        }
332      }
333    }
334  }
335
336  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
337    Throwable error) {
338    if (error != null) {
339      LOG.warn("Failed to locate region in '" + tableName + "', row='"
340        + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
341    }
342    Optional<LocateRequest> toSend = Optional.empty();
343    TableCache tableCache = getTableCache(tableName);
344    if (locs != null) {
345      RegionLocations addedLocs = addToCache(tableCache, locs);
346      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
347      synchronized (tableCache) {
348        tableCache.pendingRequests.remove(req);
349        futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
350        // Remove a complete locate request in a synchronized block, so the table cache must have
351        // quota to send a candidate request.
352        toSend = tableCache.getCandidate();
353        toSend.ifPresent(r -> tableCache.send(r));
354      }
355      futureResultList.forEach(RegionLocationsFutureResult::complete);
356      toSend.ifPresent(r -> locateInMeta(tableName, r));
357    } else {
358      // we meet an error
359      assert error != null;
360      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
361      synchronized (tableCache) {
362        tableCache.pendingRequests.remove(req);
363        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
364        // already retried several times
365        CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
366        if (future != null) {
367          futureResultList.add(new RegionLocationsFutureResult(future, null, error));
368        }
369        futureResultList.addAll(tableCache.clearCompletedRequests(null));
370        // Remove a complete locate request in a synchronized block, so the table cache must have
371        // quota to send a candidate request.
372        toSend = tableCache.getCandidate();
373        toSend.ifPresent(r -> tableCache.send(r));
374      }
375      futureResultList.forEach(RegionLocationsFutureResult::complete);
376      toSend.ifPresent(r -> locateInMeta(tableName, r));
377    }
378  }
379
380  // return whether we should stop the scan
381  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
382    RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
383    if (LOG.isDebugEnabled()) {
384      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
385        Bytes.toStringBinary(req.row), req.locateType, locs);
386    }
387    // remove HRegionLocation with null location, i.e, getServerName returns null.
388    if (locs != null) {
389      locs = locs.removeElementsWithNullLocation();
390    }
391
392    // the default region location should always be presented when fetching from meta, otherwise
393    // let's fail the request.
394    if (locs == null || locs.getDefaultRegionLocation() == null) {
395      complete(tableName, req, null,
396        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
397          tableName, Bytes.toStringBinary(req.row), req.locateType)));
398      return true;
399    }
400    HRegionLocation loc = locs.getDefaultRegionLocation();
401    RegionInfo info = loc.getRegion();
402    if (info == null) {
403      complete(tableName, req, null,
404        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
405          tableName, Bytes.toStringBinary(req.row), req.locateType)));
406      return true;
407    }
408    if (info.isSplitParent()) {
409      return false;
410    }
411    complete(tableName, req, locs, null);
412    return true;
413  }
414
415  private void recordCacheHit() {
416    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
417  }
418
419  private void recordCacheMiss() {
420    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
421  }
422
423  private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
424    int replicaId) {
425    Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
426    if (entry == null) {
427      recordCacheMiss();
428      return null;
429    }
430    RegionLocations locs = entry.getValue();
431    HRegionLocation loc = locs.getRegionLocation(replicaId);
432    if (loc == null) {
433      recordCacheMiss();
434      return null;
435    }
436    byte[] endKey = loc.getRegion().getEndKey();
437    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
438      if (LOG.isTraceEnabled()) {
439        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
440          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
441      }
442      recordCacheHit();
443      return locs;
444    } else {
445      recordCacheMiss();
446      return null;
447    }
448  }
449
450  private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
451    byte[] row, int replicaId) {
452    boolean isEmptyStopRow = isEmptyStopRow(row);
453    Map.Entry<byte[], RegionLocations> entry =
454      isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
455    if (entry == null) {
456      recordCacheMiss();
457      return null;
458    }
459    RegionLocations locs = entry.getValue();
460    HRegionLocation loc = locs.getRegionLocation(replicaId);
461    if (loc == null) {
462      recordCacheMiss();
463      return null;
464    }
465    if (
466      isEmptyStopRow(loc.getRegion().getEndKey())
467        || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
468    ) {
469      if (LOG.isTraceEnabled()) {
470        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
471          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
472      }
473      recordCacheHit();
474      return locs;
475    } else {
476      recordCacheMiss();
477      return null;
478    }
479  }
480
481  private void locateInMeta(TableName tableName, LocateRequest req) {
482    if (LOG.isTraceEnabled()) {
483      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
484        + "', locateType=" + req.locateType + " in meta");
485    }
486    byte[] metaStartKey;
487    if (req.locateType.equals(RegionLocateType.BEFORE)) {
488      if (isEmptyStopRow(req.row)) {
489        byte[] binaryTableName = tableName.getName();
490        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
491      } else {
492        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
493      }
494    } else {
495      metaStartKey = createRegionName(tableName, req.row, NINES, false);
496    }
497    byte[] metaStopKey =
498      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
499    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
500      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
501      .setReadType(ReadType.PREAD);
502
503    switch (this.metaReplicaMode) {
504      case LOAD_BALANCE:
505        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
506        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
507          // If the selector gives a non-primary meta replica region, then go with it.
508          // Otherwise, just go to primary in non-hedgedRead mode.
509          scan.setConsistency(Consistency.TIMELINE);
510          scan.setReplicaId(metaReplicaId);
511        }
512        break;
513      case HEDGED_READ:
514        scan.setConsistency(Consistency.TIMELINE);
515        break;
516      default:
517        // do nothing
518    }
519
520    conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
521
522      private boolean completeNormally = false;
523
524      private boolean tableNotFound = true;
525
526      @Override
527      public void onError(Throwable error) {
528        complete(tableName, req, null, error);
529      }
530
531      @Override
532      public void onComplete() {
533        if (tableNotFound) {
534          complete(tableName, req, null, new TableNotFoundException(tableName));
535        } else if (!completeNormally) {
536          complete(tableName, req, null, new IOException(
537            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
538        }
539      }
540
541      @Override
542      public void onNext(Result[] results, ScanController controller) {
543        if (results.length == 0) {
544          return;
545        }
546        tableNotFound = false;
547        int i = 0;
548        for (; i < results.length; i++) {
549          if (onScanNext(tableName, req, results[i])) {
550            completeNormally = true;
551            controller.terminate();
552            i++;
553            break;
554          }
555        }
556        // Add the remaining results into cache
557        if (i < results.length) {
558          TableCache tableCache = getTableCache(tableName);
559          for (; i < results.length; i++) {
560            RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
561            if (locs == null) {
562              continue;
563            }
564            HRegionLocation loc = locs.getDefaultRegionLocation();
565            if (loc == null) {
566              continue;
567            }
568            RegionInfo info = loc.getRegion();
569            if (info == null || info.isOffline() || info.isSplitParent()) {
570              continue;
571            }
572            RegionLocations addedLocs = addToCache(tableCache, locs);
573            List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
574            synchronized (tableCache) {
575              futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
576            }
577            futureResultList.forEach(RegionLocationsFutureResult::complete);
578          }
579        }
580      }
581    });
582  }
583
584  private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
585    int replicaId, RegionLocateType locateType) {
586    return locateType.equals(RegionLocateType.BEFORE)
587      ? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
588      : locateRowInCache(tableCache, tableName, row, replicaId);
589  }
590
591  // locateToPrevious is true means we will use the start key of a region to locate the region
592  // placed before it. Used for reverse scan. See the comment of
593  // AsyncRegionLocator.getPreviousRegionLocation.
594  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
595    byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
596    // AFTER should be convert to CURRENT before calling this method
597    assert !locateType.equals(RegionLocateType.AFTER);
598    TableCache tableCache = getTableCache(tableName);
599    if (!reload) {
600      RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
601      if (isGood(locs, replicaId)) {
602        return CompletableFuture.completedFuture(locs);
603      }
604    }
605    CompletableFuture<RegionLocations> future;
606    LocateRequest req;
607    boolean sendRequest = false;
608    synchronized (tableCache) {
609      // check again
610      if (!reload) {
611        RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
612        if (isGood(locs, replicaId)) {
613          return CompletableFuture.completedFuture(locs);
614        }
615      }
616      req = new LocateRequest(row, locateType);
617      future = tableCache.allRequests.get(req);
618      if (future == null) {
619        future = new CompletableFuture<>();
620        tableCache.allRequests.put(req, future);
621        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
622          tableCache.send(req);
623          sendRequest = true;
624        }
625      }
626    }
627    if (sendRequest) {
628      locateInMeta(tableName, req);
629    }
630    return future;
631  }
632
633  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
634    int replicaId, RegionLocateType locateType, boolean reload) {
635    // as we know the exact row after us, so we can just create the new row, and use the same
636    // algorithm to locate it.
637    if (locateType.equals(RegionLocateType.AFTER)) {
638      row = createClosestRowAfter(row);
639      locateType = RegionLocateType.CURRENT;
640    }
641    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
642  }
643
644  private void recordClearRegionCache() {
645    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
646  }
647
648  private void removeLocationFromCache(HRegionLocation loc) {
649    TableCache tableCache = cache.get(loc.getRegion().getTable());
650    if (tableCache == null) {
651      return;
652    }
653    byte[] startKey = loc.getRegion().getStartKey();
654    for (;;) {
655      RegionLocations oldLocs = tableCache.cache.get(startKey);
656      if (oldLocs == null) {
657        return;
658      }
659      HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
660      if (!canUpdateOnError(loc, oldLoc)) {
661        return;
662      }
663      // Tell metaReplicaSelector that the location is stale. It will create a stale entry
664      // with timestamp internally. Next time the client looks up the same location,
665      // it will pick a different meta replica region.
666      if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
667        metaReplicaSelector.onError(loc);
668      }
669
670      RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
671      if (newLocs == null) {
672        if (tableCache.cache.remove(startKey, oldLocs)) {
673          recordClearRegionCache();
674          return;
675        }
676      } else {
677        if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
678          recordClearRegionCache();
679          return;
680        }
681      }
682    }
683  }
684
685  void addLocationToCache(HRegionLocation loc) {
686    addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
687  }
688
689  private HRegionLocation getCachedLocation(HRegionLocation loc) {
690    TableCache tableCache = cache.get(loc.getRegion().getTable());
691    if (tableCache == null) {
692      return null;
693    }
694    RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
695    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
696  }
697
698  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
699    Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
700    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
701      this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
702  }
703
704  void clearCache(TableName tableName) {
705    TableCache tableCache = cache.remove(tableName);
706    if (tableCache == null) {
707      return;
708    }
709    List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
710    synchronized (tableCache) {
711      if (!tableCache.allRequests.isEmpty()) {
712        IOException error = new IOException("Cache cleared");
713        tableCache.allRequests.values().forEach(f -> {
714          futureResultList.add(new RegionLocationsFutureResult(f, null, error));
715        });
716      }
717    }
718    futureResultList.forEach(RegionLocationsFutureResult::complete);
719    conn.getConnectionMetrics()
720      .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
721  }
722
723  void clearCache() {
724    cache.clear();
725  }
726
727  void clearCache(ServerName serverName) {
728    for (TableCache tableCache : cache.values()) {
729      for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
730        byte[] regionName = entry.getKey();
731        RegionLocations locs = entry.getValue();
732        RegionLocations newLocs = locs.removeByServer(serverName);
733        if (locs == newLocs) {
734          continue;
735        }
736        if (newLocs.isEmpty()) {
737          tableCache.cache.remove(regionName, locs);
738        } else {
739          tableCache.cache.replace(regionName, locs, newLocs);
740        }
741      }
742    }
743  }
744
745  // only used for testing whether we have cached the location for a region.
746  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
747    TableCache tableCache = cache.get(tableName);
748    if (tableCache == null) {
749      return null;
750    }
751    return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
752  }
753
754  // only used for testing whether we have cached the location for a table.
755  int getNumberOfCachedRegionLocations(TableName tableName) {
756    TableCache tableCache = cache.get(tableName);
757    if (tableCache == null) {
758      return 0;
759    }
760    return tableCache.cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum();
761  }
762}