001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
022import static org.apache.hadoop.hbase.HConstants.NINES;
023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
024import static org.apache.hadoop.hbase.HConstants.ZEROES;
025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
028import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_CACHE_INVALIDATE_INTERVAL;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
030import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
031import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
032import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
033import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
034
035import java.io.IOException;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.LinkedHashMap;
041import java.util.List;
042import java.util.Map;
043import java.util.Optional;
044import java.util.Set;
045import java.util.concurrent.CompletableFuture;
046import java.util.concurrent.ConcurrentHashMap;
047import java.util.concurrent.ConcurrentMap;
048import java.util.concurrent.TimeUnit;
049import org.apache.commons.lang3.ObjectUtils;
050import org.apache.hadoop.hbase.CatalogFamilyFormat;
051import org.apache.hadoop.hbase.CatalogReplicaMode;
052import org.apache.hadoop.hbase.HBaseIOException;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionLocation;
055import org.apache.hadoop.hbase.RegionLocations;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.TableNotFoundException;
059import org.apache.hadoop.hbase.client.Scan.ReadType;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.FutureUtils;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
067import org.apache.hbase.thirdparty.io.netty.util.Timeout;
068import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
069
070/**
071 * The asynchronous locator for regions other than meta.
072 */
073@InterfaceAudience.Private
074class AsyncNonMetaRegionLocator {
075
076  private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class);
077
078  static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE =
079    "hbase.client.meta.max.concurrent.locate.per.table";
080
081  private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8;
082
083  static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit";
084
085  private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10;
086
087  private final AsyncConnectionImpl conn;
088
089  private final int maxConcurrentLocateRequestPerTable;
090
091  private final int locatePrefetchLimit;
092
093  // The mode tells if HedgedRead, LoadBalance mode is supported.
094  // The default mode is CatalogReplicaMode.None.
095  private CatalogReplicaMode metaReplicaMode;
096  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
097
098  private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
099
100  private final HashedWheelTimer retryTimer;
101
102  private static final class LocateRequest {
103
104    private final byte[] row;
105
106    private final RegionLocateType locateType;
107
108    public LocateRequest(byte[] row, RegionLocateType locateType) {
109      this.row = row;
110      this.locateType = locateType;
111    }
112
113    @Override
114    public int hashCode() {
115      return Bytes.hashCode(row) ^ locateType.hashCode();
116    }
117
118    @Override
119    public boolean equals(Object obj) {
120      if (obj == null || obj.getClass() != LocateRequest.class) {
121        return false;
122      }
123      LocateRequest that = (LocateRequest) obj;
124      return locateType.equals(that.locateType) && Bytes.equals(row, that.row);
125    }
126  }
127
128  private static final class RegionLocationsFutureResult {
129    private final CompletableFuture<RegionLocations> future;
130    private final RegionLocations result;
131    private final Throwable e;
132
133    public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
134      RegionLocations result, Throwable e) {
135      this.future = future;
136      this.result = result;
137      this.e = e;
138    }
139
140    public void complete() {
141      if (e != null) {
142        future.completeExceptionally(e);
143      }
144      future.complete(result);
145    }
146  }
147
148  private static final class TableCache {
149
150    private final Set<LocateRequest> pendingRequests = new HashSet<>();
151
152    private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
153      new LinkedHashMap<>();
154    private final AsyncRegionLocationCache regionLocationCache;
155
156    public TableCache(TableName tableName) {
157      regionLocationCache = new AsyncRegionLocationCache(tableName);
158    }
159
160    public boolean hasQuota(int max) {
161      return pendingRequests.size() < max;
162    }
163
164    public boolean isPending(LocateRequest req) {
165      return pendingRequests.contains(req);
166    }
167
168    public void send(LocateRequest req) {
169      pendingRequests.add(req);
170    }
171
172    public Optional<LocateRequest> getCandidate() {
173      return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
174    }
175
176    public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
177      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
178      for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
179        allRequests.entrySet().iterator(); iter.hasNext();) {
180        Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
181        if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
182          iter.remove();
183        }
184      }
185      return futureResultList;
186    }
187
188    private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
189      RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
190      if (future.isDone()) {
191        return true;
192      }
193      if (locations == null) {
194        return false;
195      }
196      HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
197      // we should at least have one location available, otherwise the request should fail and
198      // should not arrive here
199      assert loc != null;
200      boolean completed;
201      if (req.locateType.equals(RegionLocateType.BEFORE)) {
202        // for locating the row before current row, the common case is to find the previous region
203        // in reverse scan, so we check the endKey first. In general, the condition should be
204        // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row ||
205        // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey <
206        // endKey.
207        byte[] endKey = loc.getRegion().getEndKey();
208        int c = Bytes.compareTo(endKey, req.row);
209        completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey))
210          && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0);
211      } else {
212        completed = loc.getRegion().containsRow(req.row);
213      }
214      if (completed) {
215        futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
216        return true;
217      } else {
218        return false;
219      }
220    }
221  }
222
223  AsyncNonMetaRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
224    this.conn = conn;
225    this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt(
226      MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
227    this.locatePrefetchLimit =
228      conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
229
230    // Get the region locator's meta replica mode.
231    this.metaReplicaMode = CatalogReplicaMode.fromString(
232      conn.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
233
234    switch (this.metaReplicaMode) {
235      case LOAD_BALANCE:
236        String replicaSelectorClass =
237          conn.getConfiguration().get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
238            CatalogReplicaLoadBalanceSimpleSelector.class.getName());
239
240        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory
241          .createSelector(replicaSelectorClass, META_TABLE_NAME, conn, () -> {
242            int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
243            try {
244              RegionLocations metaLocations = conn.registry.getMetaRegionLocations()
245                .get(conn.connConf.getMetaReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
246              numOfReplicas = metaLocations.size();
247            } catch (Exception e) {
248              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
249            }
250            return numOfReplicas;
251          });
252        break;
253      case NONE:
254        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
255        boolean useMetaReplicas =
256          conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
257        if (useMetaReplicas) {
258          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
259        }
260        break;
261      default:
262        // Doing nothing
263    }
264
265    // The interval of invalidate meta cache task,
266    // if disable/delete table using same connection or usually create a new connection, no need to
267    // set it.
268    // Suggest set it to 24h or a higher value, because disable/delete table usually not very
269    // frequently.
270    this.retryTimer = retryTimer;
271    long metaCacheInvalidateInterval =
272      conn.getConfiguration().getLong(HBASE_CLIENT_META_CACHE_INVALIDATE_INTERVAL, 0L);
273    if (metaCacheInvalidateInterval > 0) {
274      TimerTask invalidateMetaCacheTask = getInvalidateMetaCacheTask(metaCacheInvalidateInterval);
275      this.retryTimer.newTimeout(invalidateMetaCacheTask, metaCacheInvalidateInterval,
276        TimeUnit.MILLISECONDS);
277    }
278  }
279
280  private TableCache getTableCache(TableName tableName) {
281    return computeIfAbsent(cache, tableName, () -> new TableCache(tableName));
282  }
283
284  private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
285    Throwable error) {
286    if (error != null) {
287      LOG.warn("Failed to locate region in '" + tableName + "', row='"
288        + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error);
289    }
290    Optional<LocateRequest> toSend = Optional.empty();
291    TableCache tableCache = getTableCache(tableName);
292    if (locs != null) {
293      RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
294      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
295      synchronized (tableCache) {
296        tableCache.pendingRequests.remove(req);
297        futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
298        // Remove a complete locate request in a synchronized block, so the table cache must have
299        // quota to send a candidate request.
300        toSend = tableCache.getCandidate();
301        toSend.ifPresent(r -> tableCache.send(r));
302      }
303      futureResultList.forEach(RegionLocationsFutureResult::complete);
304      toSend.ifPresent(r -> locateInMeta(tableName, r));
305    } else {
306      // we meet an error
307      assert error != null;
308      List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
309      synchronized (tableCache) {
310        tableCache.pendingRequests.remove(req);
311        // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
312        // already retried several times
313        CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
314        if (future != null) {
315          futureResultList.add(new RegionLocationsFutureResult(future, null, error));
316        }
317        futureResultList.addAll(tableCache.clearCompletedRequests(null));
318        // Remove a complete locate request in a synchronized block, so the table cache must have
319        // quota to send a candidate request.
320        toSend = tableCache.getCandidate();
321        toSend.ifPresent(r -> tableCache.send(r));
322      }
323      futureResultList.forEach(RegionLocationsFutureResult::complete);
324      toSend.ifPresent(r -> locateInMeta(tableName, r));
325    }
326  }
327
328  // return whether we should stop the scan
329  private boolean onScanNext(TableName tableName, LocateRequest req, Result result) {
330    RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result);
331    if (LOG.isDebugEnabled()) {
332      LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName,
333        Bytes.toStringBinary(req.row), req.locateType, locs);
334    }
335    // remove HRegionLocation with null location, i.e, getServerName returns null.
336    if (locs != null) {
337      locs = locs.removeElementsWithNullLocation();
338    }
339
340    // the default region location should always be presented when fetching from meta, otherwise
341    // let's fail the request.
342    if (locs == null || locs.getDefaultRegionLocation() == null) {
343      complete(tableName, req, null,
344        new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s",
345          tableName, Bytes.toStringBinary(req.row), req.locateType)));
346      return true;
347    }
348    HRegionLocation loc = locs.getDefaultRegionLocation();
349    RegionInfo info = loc.getRegion();
350    if (info == null) {
351      complete(tableName, req, null,
352        new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s",
353          tableName, Bytes.toStringBinary(req.row), req.locateType)));
354      return true;
355    }
356    if (info.isSplitParent()) {
357      return false;
358    }
359    complete(tableName, req, locs, null);
360    return true;
361  }
362
363  private void recordCacheHit() {
364    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
365  }
366
367  private void recordCacheMiss() {
368    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
369  }
370
371  private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) {
372    RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId);
373    if (locs == null) {
374      recordCacheMiss();
375    } else {
376      recordCacheHit();
377    }
378    return locs;
379  }
380
381  private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) {
382    RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId);
383    if (locs == null) {
384      recordCacheMiss();
385    } else {
386      recordCacheHit();
387    }
388    return locs;
389  }
390
391  private void locateInMeta(TableName tableName, LocateRequest req) {
392    if (LOG.isTraceEnabled()) {
393      LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row)
394        + "', locateType=" + req.locateType + " in meta");
395    }
396    byte[] metaStartKey;
397    if (req.locateType.equals(RegionLocateType.BEFORE)) {
398      if (isEmptyStopRow(req.row)) {
399        byte[] binaryTableName = tableName.getName();
400        metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1);
401      } else {
402        metaStartKey = createRegionName(tableName, req.row, ZEROES, false);
403      }
404    } else {
405      metaStartKey = createRegionName(tableName, req.row, NINES, false);
406    }
407    byte[] metaStopKey =
408      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
409    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
410      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
411      .setReadType(ReadType.PREAD);
412
413    switch (this.metaReplicaMode) {
414      case LOAD_BALANCE:
415        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
416        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
417          // If the selector gives a non-primary meta replica region, then go with it.
418          // Otherwise, just go to primary in non-hedgedRead mode.
419          scan.setConsistency(Consistency.TIMELINE);
420          scan.setReplicaId(metaReplicaId);
421        }
422        break;
423      case HEDGED_READ:
424        scan.setConsistency(Consistency.TIMELINE);
425        break;
426      default:
427        // do nothing
428    }
429
430    conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
431
432      private boolean completeNormally = false;
433
434      private boolean tableNotFound = true;
435
436      @Override
437      public void onError(Throwable error) {
438        complete(tableName, req, null, error);
439      }
440
441      @Override
442      public void onComplete() {
443        if (tableNotFound) {
444          complete(tableName, req, null, new TableNotFoundException(tableName));
445        } else if (!completeNormally) {
446          complete(tableName, req, null, new IOException(
447            "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
448        }
449      }
450
451      @Override
452      public void onNext(Result[] results, ScanController controller) {
453        if (results.length == 0) {
454          return;
455        }
456        tableNotFound = false;
457        int i = 0;
458        for (; i < results.length; i++) {
459          if (onScanNext(tableName, req, results[i])) {
460            completeNormally = true;
461            controller.terminate();
462            i++;
463            break;
464          }
465        }
466        // Add the remaining results into cache
467        if (i < results.length) {
468          TableCache tableCache = getTableCache(tableName);
469          for (; i < results.length; i++) {
470            RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]);
471            if (locs == null) {
472              continue;
473            }
474            HRegionLocation loc = locs.getDefaultRegionLocation();
475            if (loc == null) {
476              continue;
477            }
478            RegionInfo info = loc.getRegion();
479            if (info == null || info.isOffline() || info.isSplitParent()) {
480              continue;
481            }
482            RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
483            List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
484            synchronized (tableCache) {
485              futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
486            }
487            futureResultList.forEach(RegionLocationsFutureResult::complete);
488          }
489        }
490      }
491    });
492  }
493
494  private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId,
495    RegionLocateType locateType) {
496    return locateType.equals(RegionLocateType.BEFORE)
497      ? locateRowBeforeInCache(tableCache, row, replicaId)
498      : locateRowInCache(tableCache, row, replicaId);
499  }
500
501  // locateToPrevious is true means we will use the start key of a region to locate the region
502  // placed before it. Used for reverse scan. See the comment of
503  // AsyncRegionLocator.getPreviousRegionLocation.
504  private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName,
505    byte[] row, int replicaId, RegionLocateType locateType, boolean reload) {
506    // AFTER should be convert to CURRENT before calling this method
507    assert !locateType.equals(RegionLocateType.AFTER);
508    TableCache tableCache = getTableCache(tableName);
509    if (!reload) {
510      RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
511      if (isGood(locs, replicaId)) {
512        return CompletableFuture.completedFuture(locs);
513      }
514    }
515    CompletableFuture<RegionLocations> future;
516    LocateRequest req;
517    boolean sendRequest = false;
518    synchronized (tableCache) {
519      // check again
520      if (!reload) {
521        RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
522        if (isGood(locs, replicaId)) {
523          return CompletableFuture.completedFuture(locs);
524        }
525      }
526      req = new LocateRequest(row, locateType);
527      future = tableCache.allRequests.get(req);
528      if (future == null) {
529        future = new CompletableFuture<>();
530        tableCache.allRequests.put(req, future);
531        if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) {
532          tableCache.send(req);
533          sendRequest = true;
534        }
535      }
536    }
537    if (sendRequest) {
538      locateInMeta(tableName, req);
539    }
540    return future;
541  }
542
543  CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
544    int replicaId, RegionLocateType locateType, boolean reload) {
545    // as we know the exact row after us, so we can just create the new row, and use the same
546    // algorithm to locate it.
547    if (locateType.equals(RegionLocateType.AFTER)) {
548      row = createClosestRowAfter(row);
549      locateType = RegionLocateType.CURRENT;
550    }
551    return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload);
552  }
553
554  private void recordClearRegionCache() {
555    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
556  }
557
558  private void removeLocationFromCache(HRegionLocation loc) {
559    TableCache tableCache = cache.get(loc.getRegion().getTable());
560    if (tableCache != null) {
561      if (tableCache.regionLocationCache.remove(loc)) {
562        recordClearRegionCache();
563        updateMetaReplicaSelector(loc);
564      }
565    }
566  }
567
568  private void updateMetaReplicaSelector(HRegionLocation loc) {
569    // Tell metaReplicaSelector that the location is stale. It will create a stale entry
570    // with timestamp internally. Next time the client looks up the same location,
571    // it will pick a different meta replica region.
572    if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
573      metaReplicaSelector.onError(loc);
574    }
575  }
576
577  void addLocationToCache(HRegionLocation loc) {
578    getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc));
579  }
580
581  private HRegionLocation getCachedLocation(HRegionLocation loc) {
582    TableCache tableCache = cache.get(loc.getRegion().getTable());
583    if (tableCache == null) {
584      return null;
585    }
586    RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey());
587    return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
588  }
589
590  void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
591    Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
592    AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
593      this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
594  }
595
596  void clearCache(TableName tableName) {
597    TableCache tableCache = cache.remove(tableName);
598    if (tableCache == null) {
599      return;
600    }
601    List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
602    synchronized (tableCache) {
603      if (!tableCache.allRequests.isEmpty()) {
604        IOException error = new IOException("Cache cleared");
605        tableCache.allRequests.values().forEach(f -> {
606          futureResultList.add(new RegionLocationsFutureResult(f, null, error));
607        });
608      }
609    }
610    futureResultList.forEach(RegionLocationsFutureResult::complete);
611    conn.getConnectionMetrics().ifPresent(
612      metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size()));
613  }
614
615  void clearCache() {
616    cache.clear();
617  }
618
619  void clearCache(ServerName serverName) {
620    for (TableCache tableCache : cache.values()) {
621      tableCache.regionLocationCache.removeForServer(serverName);
622    }
623  }
624
625  // only used for testing whether we have cached the location for a region.
626  RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
627    TableCache tableCache = cache.get(tableName);
628    if (tableCache == null) {
629      return null;
630    }
631    return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
632  }
633
634  // only used for testing whether we have cached the location for a table.
635  int getNumberOfCachedRegionLocations(TableName tableName) {
636    TableCache tableCache = cache.get(tableName);
637    if (tableCache == null) {
638      return 0;
639    }
640    return tableCache.regionLocationCache.getAll().stream()
641      .mapToInt(RegionLocations::numNonNullElements).sum();
642  }
643
644  private TimerTask getInvalidateMetaCacheTask(long metaCacheInvalidateInterval) {
645    return new TimerTask() {
646      @Override
647      public void run(Timeout timeout) throws Exception {
648        FutureUtils.addListener(invalidateTableCache(), (res, err) -> {
649          if (err != null) {
650            LOG.warn("InvalidateTableCache failed.", err);
651          }
652          AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, metaCacheInvalidateInterval,
653            TimeUnit.MILLISECONDS);
654        });
655      }
656    };
657  }
658
659  private CompletableFuture<Void> invalidateTableCache() {
660    CompletableFuture<Void> future = new CompletableFuture<>();
661    Iterator<TableName> tbnIter = cache.keySet().iterator();
662    AsyncAdmin admin = conn.getAdmin();
663    invalidateCache(future, tbnIter, admin);
664    return future;
665  }
666
667  private void invalidateCache(CompletableFuture<Void> future, Iterator<TableName> tbnIter,
668    AsyncAdmin admin) {
669    if (tbnIter.hasNext()) {
670      TableName tableName = tbnIter.next();
671      FutureUtils.addListener(admin.isTableDisabled(tableName), (tableDisabled, err) -> {
672        boolean shouldInvalidateCache = false;
673        if (err != null) {
674          if (err instanceof TableNotFoundException) {
675            LOG.info("Table {} was not exist, will invalidate its cache.", tableName);
676            shouldInvalidateCache = true;
677          } else {
678            // If other exception occurred, just skip to invalidate it cache.
679            LOG.warn("Get table state of {} failed, skip to invalidate its cache.", tableName, err);
680            return;
681          }
682        } else if (tableDisabled) {
683          LOG.info("Table {} was disabled, will invalidate its cache.", tableName);
684          shouldInvalidateCache = true;
685        }
686        if (shouldInvalidateCache) {
687          clearCache(tableName);
688          LOG.info("Invalidate cache for {} succeed.", tableName);
689        } else {
690          LOG.debug("Table {} is normal, no need to invalidate its cache.", tableName);
691        }
692        invalidateCache(future, tbnIter, admin);
693      });
694    } else {
695      future.complete(null);
696    }
697  }
698}