001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
022import static org.apache.hadoop.hbase.HConstants.NINES;
023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
024import static org.apache.hadoop.hbase.HConstants.ZEROES;
025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
030import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
031import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
032import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
033
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.LinkedHashMap;
040import java.util.List;
041import java.util.Map;
042import java.util.Optional;
043import java.util.Set;
044import java.util.concurrent.CompletableFuture;
045import java.util.concurrent.ConcurrentHashMap;
046import java.util.concurrent.ConcurrentMap;
047import java.util.concurrent.TimeUnit;
048import org.apache.commons.lang3.ObjectUtils;
049import org.apache.hadoop.hbase.CatalogFamilyFormat;
050import org.apache.hadoop.hbase.CatalogReplicaMode;
051import org.apache.hadoop.hbase.HBaseIOException;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HRegionLocation;
054import org.apache.hadoop.hbase.RegionLocations;
055import org.apache.hadoop.hbase.ServerName;
056import org.apache.hadoop.hbase.TableName;
057import org.apache.hadoop.hbase.TableNotFoundException;
058import org.apache.hadoop.hbase.client.Scan.ReadType;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.FutureUtils;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
066import org.apache.hbase.thirdparty.io.netty.util.Timeout;
067import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
068
069/**
070 * The asynchronous locator for regions other than meta.
071 */
072@InterfaceAudience.Private
073class AsyncNonMetaRegionLocator {
074
075  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
076
077  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
078    "hbase.client.meta.max.concurrent.locate.per.table";
079
080  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
081
082  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
083
084  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
085
086  private final AsyncConnectionImpl conn;
087
088  private final int maxConcurrentLocateRequestPerTable;
089
090  private final int locatePrefetchLimit;
091
092  // The mode tells if HedgedRead, LoadBalance mode is supported.
093  // The default mode is CatalogReplicaMode.None.
094  private CatalogReplicaMode metaReplicaMode;
095  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
096
097  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
098
099  private final HashedWheelTimer retryTimer;
100
101  private static final class LocateRequest {
102
103    private final byte[] row;
104
105    private final RegionLocateType locateType;
106
107    public LocateRequest(byte[] row, RegionLocateType locateType) {
108      this.row = row;
109      this.locateType = locateType;
110    }
111
112    @Override
113    public int hashCode() {
114      return Bytes.hashCode(row) ^ locateType.hashCode();
115    }
116
117    @Override
118    public boolean equals(Object obj) {
119      if (obj == null || obj.getClass() != LocateRequest.class) {
120        return false;
121      }
122      LocateRequest that = (LocateRequest) obj;
123      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
124    }
125  }
126
127  private static final class RegionLocationsFutureResult {
128    private final CompletableFuture<RegionLocations> future;
129    private final RegionLocations result;
130    private final Throwable e;
131
132    public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
133      RegionLocations result, Throwable e) {
134      this.future = future;
135      this.result = result;
136      this.e = e;
137    }
138
139    public void complete() {
140      if (e != null) {
141        future.completeExceptionally(e);
142      }
143      future.complete(result);
144    }
145  }
146
147  private static final class TableCache {
148
149    private final Set<LocateRequest> pendingRequests = new HashSet<>();
150
151    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
152      new LinkedHashMap<>();
153    private final AsyncRegionLocationCache regionLocationCache;
154
155    public TableCache(TableName tableName) {
156      regionLocationCache = new AsyncRegionLocationCache(tableName);
157    }
158
159    public boolean hasQuota(int max) {
160      return pendingRequests.size() < max;
161    }
162
163    public boolean isPending(LocateRequest req) {
164      return pendingRequests.contains(req);
165    }
166
167    public void send(LocateRequest req) {
168      pendingRequests.add(req);
169    }
170
171    public Optional<LocateRequest> getCandidate() {
172      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
173    }
174
175    public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
176      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
177      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
178        allRequests.entrySet().iterator(); iter.hasNext();) {
179        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
180        if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
181          iter.remove();
182        }
183      }
184      return futureResultList;
185    }
186
187    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
188      RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
189      if (future.isDone()) {
190        return true;
191      }
192      if (locations == null) {
193        return false;
194      }
195      HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
196      // we should at least have one location available, otherwise the request should fail and
197      // should not arrive here
198      assert loc != null;
199      boolean completed;
200      if (req.locateType.equals(RegionLocateType.BEFORE)) {
201        // for locating the row before current row, the common case is to find the previous region
202        // in reverse scan, so we check the endKey first. In general, the condition should be
203        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
204        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
205        // endKey.
206        byte[] endKey = loc.getRegion().getEndKey();
207        int c = Bytes.compareTo(endKey, req.row);
208        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey))
209          && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
210      } else {
211        completed = loc.getRegion().containsRow(req.row);
212      }
213      if (completed) {
214        futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
215        return true;
216      } else {
217        return false;
218      }
219    }
220  }
221
222  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
223    this.conn = conn;
224    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
225      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
226    this.locatePrefetchLimit =
227      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
228
229    // Get the region locator's meta replica mode.
230    this.metaReplicaMode = CatalogReplicaMode.fromString(
231      conn.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
232
233    switch (this.metaReplicaMode) {
234      case LOAD_BALANCE:
235        String replicaSelectorClass =
236          conn.getConfiguration().get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
237            CatalogReplicaLoadBalanceSimpleSelector.class.getName());
238
239        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory
240          .createSelector(replicaSelectorClass, META_TABLE_NAME, conn, () -> {
241            int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
242            try {
243              RegionLocations metaLocations = conn.registry.getMetaRegionLocations()
244                .get(conn.connConf.getMetaReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
245              numOfReplicas = metaLocations.size();
246            } catch (Exception e) {
247              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
248            }
249            return numOfReplicas;
250          });
251        break;
252      case NONE:
253        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
254        boolean useMetaReplicas =
255          conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
256        if (useMetaReplicas) {
257          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
258        }
259        break;
260      default:
261        // Doing nothing
262    }
263
264    // The interval of invalidate meta cache task,
265    // if disable/delete table using same connection or usually create a new connection, no need to
266    // set it.
267    // Suggest set it to 24h or a higher value, because disable/delete table usually not very
268    // frequently.
269    this.retryTimer = retryTimer;
270    long metaCacheInvalidateInterval = conn.getConfiguration()
271      .getLong("hbase.client.connection.metacache.invalidate-interval.ms", 0L);
272    if (metaCacheInvalidateInterval > 0) {
273      TimerTask invalidateMetaCacheTask = getInvalidateMetaCacheTask(metaCacheInvalidateInterval);
274      this.retryTimer.newTimeout(invalidateMetaCacheTask, metaCacheInvalidateInterval,
275        TimeUnit.MILLISECONDS);
276    }
277  }
278
279  private TableCache getTableCache(TableName tableName) {
280    return computeIfAbsent(cache, tableName, () -> new TableCache(tableName));
281  }
282
283  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
284    Throwable error) {
285    if (error != null) {
286      LOG.warn("Failed to locate region in '" + tableName + "', row='"
287        + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
288    }
289    Optional<LocateRequest> toSend = Optional.empty();
290    TableCache tableCache = getTableCache(tableName);
291    if (locs != null) {
292      RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
293      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
294      synchronized (tableCache) {
295        tableCache.pendingRequests.remove(req);
296        futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
297        // Remove a complete locate request in a synchronized block, so the table cache must have
298        // quota to send a candidate request.
299        toSend = tableCache.getCandidate();
300        toSend.ifPresent(r -> tableCache.send(r));
301      }
302      futureResultList.forEach(RegionLocationsFutureResult::complete);
303      toSend.ifPresent(r -> locateInMeta(tableName, r));
304    } else {
305      // we meet an error
306      assert error != null;
307      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
308      synchronized (tableCache) {
309        tableCache.pendingRequests.remove(req);
310        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
311        // already retried several times
312        CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
313        if (future != null) {
314          futureResultList.add(new RegionLocationsFutureResult(future, null, error));
315        }
316        futureResultList.addAll(tableCache.clearCompletedRequests(null));
317        // Remove a complete locate request in a synchronized block, so the table cache must have
318        // quota to send a candidate request.
319        toSend = tableCache.getCandidate();
320        toSend.ifPresent(r -> tableCache.send(r));
321      }
322      futureResultList.forEach(RegionLocationsFutureResult::complete);
323      toSend.ifPresent(r -> locateInMeta(tableName, r));
324    }
325  }
326
327  // return whether we should stop the scan
328  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
329    RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
330    if (LOG.isDebugEnabled()) {
331      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
332        Bytes.toStringBinary(req.row), req.locateType, locs);
333    }
334    // remove HRegionLocation with null location, i.e, getServerName returns null.
335    if (locs != null) {
336      locs = locs.removeElementsWithNullLocation();
337    }
338
339    // the default region location should always be presented when fetching from meta, otherwise
340    // let's fail the request.
341    if (locs == null || locs.getDefaultRegionLocation() == null) {
342      complete(tableName, req, null,
343        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
344          tableName, Bytes.toStringBinary(req.row), req.locateType)));
345      return true;
346    }
347    HRegionLocation loc = locs.getDefaultRegionLocation();
348    RegionInfo info = loc.getRegion();
349    if (info == null) {
350      complete(tableName, req, null,
351        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
352          tableName, Bytes.toStringBinary(req.row), req.locateType)));
353      return true;
354    }
355    if (info.isSplitParent()) {
356      return false;
357    }
358    complete(tableName, req, locs, null);
359    return true;
360  }
361
362  private void recordCacheHit() {
363    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
364  }
365
366  private void recordCacheMiss() {
367    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
368  }
369
370  private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) {
371    RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId);
372    if (locs == null) {
373      recordCacheMiss();
374    } else {
375      recordCacheHit();
376    }
377    return locs;
378  }
379
380  private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) {
381    RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId);
382    if (locs == null) {
383      recordCacheMiss();
384    } else {
385      recordCacheHit();
386    }
387    return locs;
388  }
389
390  private void locateInMeta(TableName tableName, LocateRequest req) {
391    if (LOG.isTraceEnabled()) {
392      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
393        + "', locateType=" + req.locateType + " in meta");
394    }
395    byte[] metaStartKey;
396    if (req.locateType.equals(RegionLocateType.BEFORE)) {
397      if (isEmptyStopRow(req.row)) {
398        byte[] binaryTableName = tableName.getName();
399        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
400      } else {
401        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
402      }
403    } else {
404      metaStartKey = createRegionName(tableName, req.row, NINES, false);
405    }
406    byte[] metaStopKey =
407      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
408    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
409      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
410      .setReadType(ReadType.PREAD);
411
412    switch (this.metaReplicaMode) {
413      case LOAD_BALANCE:
414        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
415        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
416          // If the selector gives a non-primary meta replica region, then go with it.
417          // Otherwise, just go to primary in non-hedgedRead mode.
418          scan.setConsistency(Consistency.TIMELINE);
419          scan.setReplicaId(metaReplicaId);
420        }
421        break;
422      case HEDGED_READ:
423        scan.setConsistency(Consistency.TIMELINE);
424        break;
425      default:
426        // do nothing
427    }
428
429    conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
430
431      private boolean completeNormally = false;
432
433      private boolean tableNotFound = true;
434
435      @Override
436      public void onError(Throwable error) {
437        complete(tableName, req, null, error);
438      }
439
440      @Override
441      public void onComplete() {
442        if (tableNotFound) {
443          complete(tableName, req, null, new TableNotFoundException(tableName));
444        } else if (!completeNormally) {
445          complete(tableName, req, null, new IOException(
446            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
447        }
448      }
449
450      @Override
451      public void onNext(Result[] results, ScanController controller) {
452        if (results.length == 0) {
453          return;
454        }
455        tableNotFound = false;
456        int i = 0;
457        for (; i < results.length; i++) {
458          if (onScanNext(tableName, req, results[i])) {
459            completeNormally = true;
460            controller.terminate();
461            i++;
462            break;
463          }
464        }
465        // Add the remaining results into cache
466        if (i < results.length) {
467          TableCache tableCache = getTableCache(tableName);
468          for (; i < results.length; i++) {
469            RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
470            if (locs == null) {
471              continue;
472            }
473            HRegionLocation loc = locs.getDefaultRegionLocation();
474            if (loc == null) {
475              continue;
476            }
477            RegionInfo info = loc.getRegion();
478            if (info == null || info.isOffline() || info.isSplitParent()) {
479              continue;
480            }
481            RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
482            List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
483            synchronized (tableCache) {
484              futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
485            }
486            futureResultList.forEach(RegionLocationsFutureResult::complete);
487          }
488        }
489      }
490    });
491  }
492
493  private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId,
494    RegionLocateType locateType) {
495    return locateType.equals(RegionLocateType.BEFORE)
496      ? locateRowBeforeInCache(tableCache, row, replicaId)
497      : locateRowInCache(tableCache, row, replicaId);
498  }
499
500  // locateToPrevious is true means we will use the start key of a region to locate the region
501  // placed before it. Used for reverse scan. See the comment of
502  // AsyncRegionLocator.getPreviousRegionLocation.
503  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
504    byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
505    // AFTER should be convert to CURRENT before calling this method
506    assert !locateType.equals(RegionLocateType.AFTER);
507    TableCache tableCache = getTableCache(tableName);
508    if (!reload) {
509      RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
510      if (isGood(locs, replicaId)) {
511        return CompletableFuture.completedFuture(locs);
512      }
513    }
514    CompletableFuture<RegionLocations> future;
515    LocateRequest req;
516    boolean sendRequest = false;
517    synchronized (tableCache) {
518      // check again
519      if (!reload) {
520        RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
521        if (isGood(locs, replicaId)) {
522          return CompletableFuture.completedFuture(locs);
523        }
524      }
525      req = new LocateRequest(row, locateType);
526      future = tableCache.allRequests.get(req);
527      if (future == null) {
528        future = new CompletableFuture<>();
529        tableCache.allRequests.put(req, future);
530        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
531          tableCache.send(req);
532          sendRequest = true;
533        }
534      }
535    }
536    if (sendRequest) {
537      locateInMeta(tableName, req);
538    }
539    return future;
540  }
541
542  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
543    int replicaId, RegionLocateType locateType, boolean reload) {
544    // as we know the exact row after us, so we can just create the new row, and use the same
545    // algorithm to locate it.
546    if (locateType.equals(RegionLocateType.AFTER)) {
547      row = createClosestRowAfter(row);
548      locateType = RegionLocateType.CURRENT;
549    }
550    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
551  }
552
553  private void recordClearRegionCache() {
554    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
555  }
556
557  private void removeLocationFromCache(HRegionLocation loc) {
558    TableCache tableCache = cache.get(loc.getRegion().getTable());
559    if (tableCache != null) {
560      if (tableCache.regionLocationCache.remove(loc)) {
561        recordClearRegionCache();
562        updateMetaReplicaSelector(loc);
563      }
564    }
565  }
566
567  private void updateMetaReplicaSelector(HRegionLocation loc) {
568    // Tell metaReplicaSelector that the location is stale. It will create a stale entry
569    // with timestamp internally. Next time the client looks up the same location,
570    // it will pick a different meta replica region.
571    if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
572      metaReplicaSelector.onError(loc);
573    }
574  }
575
576  void addLocationToCache(HRegionLocation loc) {
577    getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc));
578  }
579
580  private HRegionLocation getCachedLocation(HRegionLocation loc) {
581    TableCache tableCache = cache.get(loc.getRegion().getTable());
582    if (tableCache == null) {
583      return null;
584    }
585    RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey());
586    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
587  }
588
589  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
590    Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
591    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
592      this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
593  }
594
595  void clearCache(TableName tableName) {
596    TableCache tableCache = cache.remove(tableName);
597    if (tableCache == null) {
598      return;
599    }
600    List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
601    synchronized (tableCache) {
602      if (!tableCache.allRequests.isEmpty()) {
603        IOException error = new IOException("Cache cleared");
604        tableCache.allRequests.values().forEach(f -> {
605          futureResultList.add(new RegionLocationsFutureResult(f, null, error));
606        });
607      }
608    }
609    futureResultList.forEach(RegionLocationsFutureResult::complete);
610    conn.getConnectionMetrics().ifPresent(
611      metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size()));
612  }
613
614  void clearCache() {
615    cache.clear();
616  }
617
618  void clearCache(ServerName serverName) {
619    for (TableCache tableCache : cache.values()) {
620      tableCache.regionLocationCache.removeForServer(serverName);
621    }
622  }
623
624  // only used for testing whether we have cached the location for a region.
625  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
626    TableCache tableCache = cache.get(tableName);
627    if (tableCache == null) {
628      return null;
629    }
630    return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
631  }
632
633  // only used for testing whether we have cached the location for a table.
634  int getNumberOfCachedRegionLocations(TableName tableName) {
635    TableCache tableCache = cache.get(tableName);
636    if (tableCache == null) {
637      return 0;
638    }
639    return tableCache.regionLocationCache.getAll().stream()
640      .mapToInt(RegionLocations::numNonNullElements).sum();
641  }
642
643  private TimerTask getInvalidateMetaCacheTask(long metaCacheInvalidateInterval) {
644    return new TimerTask() {
645      @Override
646      public void run(Timeout timeout) throws Exception {
647        FutureUtils.addListener(invalidateTableCache(), (res, err) -> {
648          if (err != null) {
649            LOG.warn("InvalidateTableCache failed.", err);
650          }
651          AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, metaCacheInvalidateInterval,
652            TimeUnit.MILLISECONDS);
653        });
654      }
655    };
656  }
657
658  private CompletableFuture<Void> invalidateTableCache() {
659    CompletableFuture<Void> future = new CompletableFuture<>();
660    Iterator<TableName> tbnIter = cache.keySet().iterator();
661    AsyncAdmin admin = conn.getAdmin();
662    invalidateCache(future, tbnIter, admin);
663    return future;
664  }
665
666  private void invalidateCache(CompletableFuture<Void> future, Iterator<TableName> tbnIter,
667    AsyncAdmin admin) {
668    if (tbnIter.hasNext()) {
669      TableName tableName = tbnIter.next();
670      FutureUtils.addListener(admin.isTableDisabled(tableName), (tableDisabled, err) -> {
671        boolean shouldInvalidateCache = false;
672        if (err != null) {
673          if (err instanceof TableNotFoundException) {
674            LOG.info("Table {} was not exist, will invalidate its cache.", tableName);
675            shouldInvalidateCache = true;
676          } else {
677            // If other exception occurred, just skip to invalidate it cache.
678            LOG.warn("Get table state of {} failed, skip to invalidate its cache.", tableName, err);
679            return;
680          }
681        } else if (tableDisabled) {
682          LOG.info("Table {} was disabled, will invalidate its cache.", tableName);
683          shouldInvalidateCache = true;
684        }
685        if (shouldInvalidateCache) {
686          clearCache(tableName);
687          LOG.info("Invalidate cache for {} succeed.", tableName);
688        } else {
689          LOG.debug("Table {} is normal, no need to invalidate its cache.", tableName);
690        }
691        invalidateCache(future, tbnIter, admin);
692      });
693    } else {
694      future.complete(null);
695    }
696  }
697}