001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.NINES; 023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 024import static org.apache.hadoop.hbase.HConstants.ZEROES; 025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; 027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 030import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; 031import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 032import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 033 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.LinkedHashMap; 040import java.util.List; 041import java.util.Map; 042import java.util.Optional; 043import java.util.Set; 044import java.util.concurrent.CompletableFuture; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.TimeUnit; 048import org.apache.commons.lang3.ObjectUtils; 049import org.apache.hadoop.hbase.CatalogFamilyFormat; 050import org.apache.hadoop.hbase.CatalogReplicaMode; 051import org.apache.hadoop.hbase.HBaseIOException; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HRegionLocation; 054import org.apache.hadoop.hbase.RegionLocations; 055import org.apache.hadoop.hbase.ServerName; 056import org.apache.hadoop.hbase.TableName; 057import org.apache.hadoop.hbase.TableNotFoundException; 058import org.apache.hadoop.hbase.client.Scan.ReadType; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.FutureUtils; 061import org.apache.yetus.audience.InterfaceAudience; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 066import org.apache.hbase.thirdparty.io.netty.util.Timeout; 067import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 068 069/** 070 * The asynchronous locator for regions other than meta. 071 */ 072@InterfaceAudience.Private 073class AsyncNonMetaRegionLocator { 074 075 private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class); 076 077 static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 078 "hbase.client.meta.max.concurrent.locate.per.table"; 079 080 private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; 081 082 static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit"; 083 084 private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10; 085 086 private final AsyncConnectionImpl conn; 087 088 private final int maxConcurrentLocateRequestPerTable; 089 090 private final int locatePrefetchLimit; 091 092 // The mode tells if HedgedRead, LoadBalance mode is supported. 093 // The default mode is CatalogReplicaMode.None. 094 private CatalogReplicaMode metaReplicaMode; 095 private CatalogReplicaLoadBalanceSelector metaReplicaSelector; 096 097 private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>(); 098 099 private final HashedWheelTimer retryTimer; 100 101 private static final class LocateRequest { 102 103 private final byte[] row; 104 105 private final RegionLocateType locateType; 106 107 public LocateRequest(byte[] row, RegionLocateType locateType) { 108 this.row = row; 109 this.locateType = locateType; 110 } 111 112 @Override 113 public int hashCode() { 114 return Bytes.hashCode(row) ^ locateType.hashCode(); 115 } 116 117 @Override 118 public boolean equals(Object obj) { 119 if (obj == null || obj.getClass() != LocateRequest.class) { 120 return false; 121 } 122 LocateRequest that = (LocateRequest) obj; 123 return locateType.equals(that.locateType) && Bytes.equals(row, that.row); 124 } 125 } 126 127 private static final class RegionLocationsFutureResult { 128 private final CompletableFuture<RegionLocations> future; 129 private final RegionLocations result; 130 private final Throwable e; 131 132 public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future, 133 RegionLocations result, Throwable e) { 134 this.future = future; 135 this.result = result; 136 this.e = e; 137 } 138 139 public void complete() { 140 if (e != null) { 141 future.completeExceptionally(e); 142 } 143 future.complete(result); 144 } 145 } 146 147 private static final class TableCache { 148 149 private final Set<LocateRequest> pendingRequests = new HashSet<>(); 150 151 private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests = 152 new LinkedHashMap<>(); 153 private final AsyncRegionLocationCache regionLocationCache; 154 155 public TableCache(TableName tableName) { 156 regionLocationCache = new AsyncRegionLocationCache(tableName); 157 } 158 159 public boolean hasQuota(int max) { 160 return pendingRequests.size() < max; 161 } 162 163 public boolean isPending(LocateRequest req) { 164 return pendingRequests.contains(req); 165 } 166 167 public void send(LocateRequest req) { 168 pendingRequests.add(req); 169 } 170 171 public Optional<LocateRequest> getCandidate() { 172 return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); 173 } 174 175 public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) { 176 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 177 for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter = 178 allRequests.entrySet().iterator(); iter.hasNext();) { 179 Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next(); 180 if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) { 181 iter.remove(); 182 } 183 } 184 return futureResultList; 185 } 186 187 private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future, 188 RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) { 189 if (future.isDone()) { 190 return true; 191 } 192 if (locations == null) { 193 return false; 194 } 195 HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations()); 196 // we should at least have one location available, otherwise the request should fail and 197 // should not arrive here 198 assert loc != null; 199 boolean completed; 200 if (req.locateType.equals(RegionLocateType.BEFORE)) { 201 // for locating the row before current row, the common case is to find the previous region 202 // in reverse scan, so we check the endKey first. In general, the condition should be 203 // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row || 204 // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey < 205 // endKey. 206 byte[] endKey = loc.getRegion().getEndKey(); 207 int c = Bytes.compareTo(endKey, req.row); 208 completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) 209 && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0); 210 } else { 211 completed = loc.getRegion().containsRow(req.row); 212 } 213 if (completed) { 214 futureResultList.add(new RegionLocationsFutureResult(future, locations, null)); 215 return true; 216 } else { 217 return false; 218 } 219 } 220 } 221 222 AsyncNonMetaRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { 223 this.conn = conn; 224 this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( 225 MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); 226 this.locatePrefetchLimit = 227 conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); 228 229 // Get the region locator's meta replica mode. 230 this.metaReplicaMode = CatalogReplicaMode.fromString( 231 conn.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 232 233 switch (this.metaReplicaMode) { 234 case LOAD_BALANCE: 235 String replicaSelectorClass = 236 conn.getConfiguration().get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, 237 CatalogReplicaLoadBalanceSimpleSelector.class.getName()); 238 239 this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory 240 .createSelector(replicaSelectorClass, META_TABLE_NAME, conn, () -> { 241 int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS; 242 try { 243 RegionLocations metaLocations = conn.registry.getMetaRegionLocations() 244 .get(conn.connConf.getMetaReadRpcTimeoutNs(), TimeUnit.NANOSECONDS); 245 numOfReplicas = metaLocations.size(); 246 } catch (Exception e) { 247 LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); 248 } 249 return numOfReplicas; 250 }); 251 break; 252 case NONE: 253 // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. 254 boolean useMetaReplicas = 255 conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); 256 if (useMetaReplicas) { 257 this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; 258 } 259 break; 260 default: 261 // Doing nothing 262 } 263 264 // The interval of invalidate meta cache task, 265 // if disable/delete table using same connection or usually create a new connection, no need to 266 // set it. 267 // Suggest set it to 24h or a higher value, because disable/delete table usually not very 268 // frequently. 269 this.retryTimer = retryTimer; 270 long metaCacheInvalidateInterval = conn.getConfiguration() 271 .getLong("hbase.client.connection.metacache.invalidate-interval.ms", 0L); 272 if (metaCacheInvalidateInterval > 0) { 273 TimerTask invalidateMetaCacheTask = getInvalidateMetaCacheTask(metaCacheInvalidateInterval); 274 this.retryTimer.newTimeout(invalidateMetaCacheTask, metaCacheInvalidateInterval, 275 TimeUnit.MILLISECONDS); 276 } 277 } 278 279 private TableCache getTableCache(TableName tableName) { 280 return computeIfAbsent(cache, tableName, () -> new TableCache(tableName)); 281 } 282 283 private void complete(TableName tableName, LocateRequest req, RegionLocations locs, 284 Throwable error) { 285 if (error != null) { 286 LOG.warn("Failed to locate region in '" + tableName + "', row='" 287 + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error); 288 } 289 Optional<LocateRequest> toSend = Optional.empty(); 290 TableCache tableCache = getTableCache(tableName); 291 if (locs != null) { 292 RegionLocations addedLocs = tableCache.regionLocationCache.add(locs); 293 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 294 synchronized (tableCache) { 295 tableCache.pendingRequests.remove(req); 296 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 297 // Remove a complete locate request in a synchronized block, so the table cache must have 298 // quota to send a candidate request. 299 toSend = tableCache.getCandidate(); 300 toSend.ifPresent(r -> tableCache.send(r)); 301 } 302 futureResultList.forEach(RegionLocationsFutureResult::complete); 303 toSend.ifPresent(r -> locateInMeta(tableName, r)); 304 } else { 305 // we meet an error 306 assert error != null; 307 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 308 synchronized (tableCache) { 309 tableCache.pendingRequests.remove(req); 310 // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have 311 // already retried several times 312 CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req); 313 if (future != null) { 314 futureResultList.add(new RegionLocationsFutureResult(future, null, error)); 315 } 316 futureResultList.addAll(tableCache.clearCompletedRequests(null)); 317 // Remove a complete locate request in a synchronized block, so the table cache must have 318 // quota to send a candidate request. 319 toSend = tableCache.getCandidate(); 320 toSend.ifPresent(r -> tableCache.send(r)); 321 } 322 futureResultList.forEach(RegionLocationsFutureResult::complete); 323 toSend.ifPresent(r -> locateInMeta(tableName, r)); 324 } 325 } 326 327 // return whether we should stop the scan 328 private boolean onScanNext(TableName tableName, LocateRequest req, Result result) { 329 RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result); 330 if (LOG.isDebugEnabled()) { 331 LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName, 332 Bytes.toStringBinary(req.row), req.locateType, locs); 333 } 334 // remove HRegionLocation with null location, i.e, getServerName returns null. 335 if (locs != null) { 336 locs = locs.removeElementsWithNullLocation(); 337 } 338 339 // the default region location should always be presented when fetching from meta, otherwise 340 // let's fail the request. 341 if (locs == null || locs.getDefaultRegionLocation() == null) { 342 complete(tableName, req, null, 343 new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s", 344 tableName, Bytes.toStringBinary(req.row), req.locateType))); 345 return true; 346 } 347 HRegionLocation loc = locs.getDefaultRegionLocation(); 348 RegionInfo info = loc.getRegion(); 349 if (info == null) { 350 complete(tableName, req, null, 351 new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", 352 tableName, Bytes.toStringBinary(req.row), req.locateType))); 353 return true; 354 } 355 if (info.isSplitParent()) { 356 return false; 357 } 358 complete(tableName, req, locs, null); 359 return true; 360 } 361 362 private void recordCacheHit() { 363 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit); 364 } 365 366 private void recordCacheMiss() { 367 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss); 368 } 369 370 private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) { 371 RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId); 372 if (locs == null) { 373 recordCacheMiss(); 374 } else { 375 recordCacheHit(); 376 } 377 return locs; 378 } 379 380 private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) { 381 RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId); 382 if (locs == null) { 383 recordCacheMiss(); 384 } else { 385 recordCacheHit(); 386 } 387 return locs; 388 } 389 390 private void locateInMeta(TableName tableName, LocateRequest req) { 391 if (LOG.isTraceEnabled()) { 392 LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) 393 + "', locateType=" + req.locateType + " in meta"); 394 } 395 byte[] metaStartKey; 396 if (req.locateType.equals(RegionLocateType.BEFORE)) { 397 if (isEmptyStopRow(req.row)) { 398 byte[] binaryTableName = tableName.getName(); 399 metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); 400 } else { 401 metaStartKey = createRegionName(tableName, req.row, ZEROES, false); 402 } 403 } else { 404 metaStartKey = createRegionName(tableName, req.row, NINES, false); 405 } 406 byte[] metaStopKey = 407 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 408 Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 409 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) 410 .setReadType(ReadType.PREAD); 411 412 switch (this.metaReplicaMode) { 413 case LOAD_BALANCE: 414 int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType); 415 if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { 416 // If the selector gives a non-primary meta replica region, then go with it. 417 // Otherwise, just go to primary in non-hedgedRead mode. 418 scan.setConsistency(Consistency.TIMELINE); 419 scan.setReplicaId(metaReplicaId); 420 } 421 break; 422 case HEDGED_READ: 423 scan.setConsistency(Consistency.TIMELINE); 424 break; 425 default: 426 // do nothing 427 } 428 429 conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() { 430 431 private boolean completeNormally = false; 432 433 private boolean tableNotFound = true; 434 435 @Override 436 public void onError(Throwable error) { 437 complete(tableName, req, null, error); 438 } 439 440 @Override 441 public void onComplete() { 442 if (tableNotFound) { 443 complete(tableName, req, null, new TableNotFoundException(tableName)); 444 } else if (!completeNormally) { 445 complete(tableName, req, null, new IOException( 446 "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName)); 447 } 448 } 449 450 @Override 451 public void onNext(Result[] results, ScanController controller) { 452 if (results.length == 0) { 453 return; 454 } 455 tableNotFound = false; 456 int i = 0; 457 for (; i < results.length; i++) { 458 if (onScanNext(tableName, req, results[i])) { 459 completeNormally = true; 460 controller.terminate(); 461 i++; 462 break; 463 } 464 } 465 // Add the remaining results into cache 466 if (i < results.length) { 467 TableCache tableCache = getTableCache(tableName); 468 for (; i < results.length; i++) { 469 RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]); 470 if (locs == null) { 471 continue; 472 } 473 HRegionLocation loc = locs.getDefaultRegionLocation(); 474 if (loc == null) { 475 continue; 476 } 477 RegionInfo info = loc.getRegion(); 478 if (info == null || info.isOffline() || info.isSplitParent()) { 479 continue; 480 } 481 RegionLocations addedLocs = tableCache.regionLocationCache.add(locs); 482 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 483 synchronized (tableCache) { 484 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 485 } 486 futureResultList.forEach(RegionLocationsFutureResult::complete); 487 } 488 } 489 } 490 }); 491 } 492 493 private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId, 494 RegionLocateType locateType) { 495 return locateType.equals(RegionLocateType.BEFORE) 496 ? locateRowBeforeInCache(tableCache, row, replicaId) 497 : locateRowInCache(tableCache, row, replicaId); 498 } 499 500 // locateToPrevious is true means we will use the start key of a region to locate the region 501 // placed before it. Used for reverse scan. See the comment of 502 // AsyncRegionLocator.getPreviousRegionLocation. 503 private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName, 504 byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { 505 // AFTER should be convert to CURRENT before calling this method 506 assert !locateType.equals(RegionLocateType.AFTER); 507 TableCache tableCache = getTableCache(tableName); 508 if (!reload) { 509 RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType); 510 if (isGood(locs, replicaId)) { 511 return CompletableFuture.completedFuture(locs); 512 } 513 } 514 CompletableFuture<RegionLocations> future; 515 LocateRequest req; 516 boolean sendRequest = false; 517 synchronized (tableCache) { 518 // check again 519 if (!reload) { 520 RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType); 521 if (isGood(locs, replicaId)) { 522 return CompletableFuture.completedFuture(locs); 523 } 524 } 525 req = new LocateRequest(row, locateType); 526 future = tableCache.allRequests.get(req); 527 if (future == null) { 528 future = new CompletableFuture<>(); 529 tableCache.allRequests.put(req, future); 530 if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { 531 tableCache.send(req); 532 sendRequest = true; 533 } 534 } 535 } 536 if (sendRequest) { 537 locateInMeta(tableName, req); 538 } 539 return future; 540 } 541 542 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 543 int replicaId, RegionLocateType locateType, boolean reload) { 544 // as we know the exact row after us, so we can just create the new row, and use the same 545 // algorithm to locate it. 546 if (locateType.equals(RegionLocateType.AFTER)) { 547 row = createClosestRowAfter(row); 548 locateType = RegionLocateType.CURRENT; 549 } 550 return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); 551 } 552 553 private void recordClearRegionCache() { 554 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion); 555 } 556 557 private void removeLocationFromCache(HRegionLocation loc) { 558 TableCache tableCache = cache.get(loc.getRegion().getTable()); 559 if (tableCache != null) { 560 if (tableCache.regionLocationCache.remove(loc)) { 561 recordClearRegionCache(); 562 updateMetaReplicaSelector(loc); 563 } 564 } 565 } 566 567 private void updateMetaReplicaSelector(HRegionLocation loc) { 568 // Tell metaReplicaSelector that the location is stale. It will create a stale entry 569 // with timestamp internally. Next time the client looks up the same location, 570 // it will pick a different meta replica region. 571 if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 572 metaReplicaSelector.onError(loc); 573 } 574 } 575 576 void addLocationToCache(HRegionLocation loc) { 577 getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc)); 578 } 579 580 private HRegionLocation getCachedLocation(HRegionLocation loc) { 581 TableCache tableCache = cache.get(loc.getRegion().getTable()); 582 if (tableCache == null) { 583 return null; 584 } 585 RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey()); 586 return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; 587 } 588 589 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 590 Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics(); 591 AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, 592 this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null)); 593 } 594 595 void clearCache(TableName tableName) { 596 TableCache tableCache = cache.remove(tableName); 597 if (tableCache == null) { 598 return; 599 } 600 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 601 synchronized (tableCache) { 602 if (!tableCache.allRequests.isEmpty()) { 603 IOException error = new IOException("Cache cleared"); 604 tableCache.allRequests.values().forEach(f -> { 605 futureResultList.add(new RegionLocationsFutureResult(f, null, error)); 606 }); 607 } 608 } 609 futureResultList.forEach(RegionLocationsFutureResult::complete); 610 conn.getConnectionMetrics().ifPresent( 611 metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size())); 612 } 613 614 void clearCache() { 615 cache.clear(); 616 } 617 618 void clearCache(ServerName serverName) { 619 for (TableCache tableCache : cache.values()) { 620 tableCache.regionLocationCache.removeForServer(serverName); 621 } 622 } 623 624 // only used for testing whether we have cached the location for a region. 625 RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { 626 TableCache tableCache = cache.get(tableName); 627 if (tableCache == null) { 628 return null; 629 } 630 return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 631 } 632 633 // only used for testing whether we have cached the location for a table. 634 int getNumberOfCachedRegionLocations(TableName tableName) { 635 TableCache tableCache = cache.get(tableName); 636 if (tableCache == null) { 637 return 0; 638 } 639 return tableCache.regionLocationCache.getAll().stream() 640 .mapToInt(RegionLocations::numNonNullElements).sum(); 641 } 642 643 private TimerTask getInvalidateMetaCacheTask(long metaCacheInvalidateInterval) { 644 return new TimerTask() { 645 @Override 646 public void run(Timeout timeout) throws Exception { 647 FutureUtils.addListener(invalidateTableCache(), (res, err) -> { 648 if (err != null) { 649 LOG.warn("InvalidateTableCache failed.", err); 650 } 651 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, metaCacheInvalidateInterval, 652 TimeUnit.MILLISECONDS); 653 }); 654 } 655 }; 656 } 657 658 private CompletableFuture<Void> invalidateTableCache() { 659 CompletableFuture<Void> future = new CompletableFuture<>(); 660 Iterator<TableName> tbnIter = cache.keySet().iterator(); 661 AsyncAdmin admin = conn.getAdmin(); 662 invalidateCache(future, tbnIter, admin); 663 return future; 664 } 665 666 private void invalidateCache(CompletableFuture<Void> future, Iterator<TableName> tbnIter, 667 AsyncAdmin admin) { 668 if (tbnIter.hasNext()) { 669 TableName tableName = tbnIter.next(); 670 FutureUtils.addListener(admin.isTableDisabled(tableName), (tableDisabled, err) -> { 671 boolean shouldInvalidateCache = false; 672 if (err != null) { 673 if (err instanceof TableNotFoundException) { 674 LOG.info("Table {} was not exist, will invalidate its cache.", tableName); 675 shouldInvalidateCache = true; 676 } else { 677 // If other exception occurred, just skip to invalidate it cache. 678 LOG.warn("Get table state of {} failed, skip to invalidate its cache.", tableName, err); 679 return; 680 } 681 } else if (tableDisabled) { 682 LOG.info("Table {} was disabled, will invalidate its cache.", tableName); 683 shouldInvalidateCache = true; 684 } 685 if (shouldInvalidateCache) { 686 clearCache(tableName); 687 LOG.info("Invalidate cache for {} succeed.", tableName); 688 } else { 689 LOG.debug("Table {} is normal, no need to invalidate its cache.", tableName); 690 } 691 invalidateCache(future, tbnIter, admin); 692 }); 693 } else { 694 future.complete(null); 695 } 696 } 697}