001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.NINES; 023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 024import static org.apache.hadoop.hbase.HConstants.ZEROES; 025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; 027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; 028import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_CACHE_INVALIDATE_INTERVAL; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; 030import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 031import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; 032import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 033import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 034 035import java.io.IOException; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.LinkedHashMap; 041import java.util.List; 042import java.util.Map; 043import java.util.Optional; 044import java.util.Set; 045import java.util.concurrent.CompletableFuture; 046import java.util.concurrent.ConcurrentHashMap; 047import java.util.concurrent.ConcurrentMap; 048import java.util.concurrent.TimeUnit; 049import org.apache.commons.lang3.ObjectUtils; 050import org.apache.hadoop.hbase.CatalogFamilyFormat; 051import org.apache.hadoop.hbase.CatalogReplicaMode; 052import org.apache.hadoop.hbase.HBaseIOException; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.HRegionLocation; 055import org.apache.hadoop.hbase.RegionLocations; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.TableNotFoundException; 059import org.apache.hadoop.hbase.client.Scan.ReadType; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.FutureUtils; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 067import org.apache.hbase.thirdparty.io.netty.util.Timeout; 068import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 069 070/** 071 * The asynchronous locator for regions other than meta. 072 */ 073@InterfaceAudience.Private 074class AsyncNonMetaRegionLocator { 075 076 private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class); 077 078 static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 079 "hbase.client.meta.max.concurrent.locate.per.table"; 080 081 private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; 082 083 static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit"; 084 085 private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10; 086 087 private final AsyncConnectionImpl conn; 088 089 private final int maxConcurrentLocateRequestPerTable; 090 091 private final int locatePrefetchLimit; 092 093 // The mode tells if HedgedRead, LoadBalance mode is supported. 094 // The default mode is CatalogReplicaMode.None. 095 private CatalogReplicaMode metaReplicaMode; 096 private CatalogReplicaLoadBalanceSelector metaReplicaSelector; 097 098 private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>(); 099 100 private final HashedWheelTimer retryTimer; 101 102 private static final class LocateRequest { 103 104 private final byte[] row; 105 106 private final RegionLocateType locateType; 107 108 public LocateRequest(byte[] row, RegionLocateType locateType) { 109 this.row = row; 110 this.locateType = locateType; 111 } 112 113 @Override 114 public int hashCode() { 115 return Bytes.hashCode(row) ^ locateType.hashCode(); 116 } 117 118 @Override 119 public boolean equals(Object obj) { 120 if (obj == null || obj.getClass() != LocateRequest.class) { 121 return false; 122 } 123 LocateRequest that = (LocateRequest) obj; 124 return locateType.equals(that.locateType) && Bytes.equals(row, that.row); 125 } 126 } 127 128 private static final class RegionLocationsFutureResult { 129 private final CompletableFuture<RegionLocations> future; 130 private final RegionLocations result; 131 private final Throwable e; 132 133 public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future, 134 RegionLocations result, Throwable e) { 135 this.future = future; 136 this.result = result; 137 this.e = e; 138 } 139 140 public void complete() { 141 if (e != null) { 142 future.completeExceptionally(e); 143 } 144 future.complete(result); 145 } 146 } 147 148 private static final class TableCache { 149 150 private final Set<LocateRequest> pendingRequests = new HashSet<>(); 151 152 private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests = 153 new LinkedHashMap<>(); 154 private final AsyncRegionLocationCache regionLocationCache; 155 156 public TableCache(TableName tableName) { 157 regionLocationCache = new AsyncRegionLocationCache(tableName); 158 } 159 160 public boolean hasQuota(int max) { 161 return pendingRequests.size() < max; 162 } 163 164 public boolean isPending(LocateRequest req) { 165 return pendingRequests.contains(req); 166 } 167 168 public void send(LocateRequest req) { 169 pendingRequests.add(req); 170 } 171 172 public Optional<LocateRequest> getCandidate() { 173 return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); 174 } 175 176 public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) { 177 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 178 for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter = 179 allRequests.entrySet().iterator(); iter.hasNext();) { 180 Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next(); 181 if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) { 182 iter.remove(); 183 } 184 } 185 return futureResultList; 186 } 187 188 private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future, 189 RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) { 190 if (future.isDone()) { 191 return true; 192 } 193 if (locations == null) { 194 return false; 195 } 196 HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations()); 197 // we should at least have one location available, otherwise the request should fail and 198 // should not arrive here 199 assert loc != null; 200 boolean completed; 201 if (req.locateType.equals(RegionLocateType.BEFORE)) { 202 // for locating the row before current row, the common case is to find the previous region 203 // in reverse scan, so we check the endKey first. In general, the condition should be 204 // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row || 205 // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey < 206 // endKey. 207 byte[] endKey = loc.getRegion().getEndKey(); 208 int c = Bytes.compareTo(endKey, req.row); 209 completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) 210 && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0); 211 } else { 212 completed = loc.getRegion().containsRow(req.row); 213 } 214 if (completed) { 215 futureResultList.add(new RegionLocationsFutureResult(future, locations, null)); 216 return true; 217 } else { 218 return false; 219 } 220 } 221 } 222 223 AsyncNonMetaRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { 224 this.conn = conn; 225 this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( 226 MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); 227 this.locatePrefetchLimit = 228 conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); 229 230 // Get the region locator's meta replica mode. 231 this.metaReplicaMode = CatalogReplicaMode.fromString( 232 conn.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 233 234 switch (this.metaReplicaMode) { 235 case LOAD_BALANCE: 236 String replicaSelectorClass = 237 conn.getConfiguration().get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, 238 CatalogReplicaLoadBalanceSimpleSelector.class.getName()); 239 240 this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory 241 .createSelector(replicaSelectorClass, META_TABLE_NAME, conn, () -> { 242 int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS; 243 try { 244 RegionLocations metaLocations = conn.registry.getMetaRegionLocations() 245 .get(conn.connConf.getMetaReadRpcTimeoutNs(), TimeUnit.NANOSECONDS); 246 numOfReplicas = metaLocations.size(); 247 } catch (Exception e) { 248 LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); 249 } 250 return numOfReplicas; 251 }); 252 break; 253 case NONE: 254 // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. 255 boolean useMetaReplicas = 256 conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); 257 if (useMetaReplicas) { 258 this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; 259 } 260 break; 261 default: 262 // Doing nothing 263 } 264 265 // The interval of invalidate meta cache task, 266 // if disable/delete table using same connection or usually create a new connection, no need to 267 // set it. 268 // Suggest set it to 24h or a higher value, because disable/delete table usually not very 269 // frequently. 270 this.retryTimer = retryTimer; 271 long metaCacheInvalidateInterval = 272 conn.getConfiguration().getLong(HBASE_CLIENT_META_CACHE_INVALIDATE_INTERVAL, 0L); 273 if (metaCacheInvalidateInterval > 0) { 274 TimerTask invalidateMetaCacheTask = getInvalidateMetaCacheTask(metaCacheInvalidateInterval); 275 this.retryTimer.newTimeout(invalidateMetaCacheTask, metaCacheInvalidateInterval, 276 TimeUnit.MILLISECONDS); 277 } 278 } 279 280 private TableCache getTableCache(TableName tableName) { 281 return computeIfAbsent(cache, tableName, () -> new TableCache(tableName)); 282 } 283 284 private void complete(TableName tableName, LocateRequest req, RegionLocations locs, 285 Throwable error) { 286 if (error != null) { 287 LOG.warn("Failed to locate region in '" + tableName + "', row='" 288 + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error); 289 } 290 Optional<LocateRequest> toSend = Optional.empty(); 291 TableCache tableCache = getTableCache(tableName); 292 if (locs != null) { 293 RegionLocations addedLocs = tableCache.regionLocationCache.add(locs); 294 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 295 synchronized (tableCache) { 296 tableCache.pendingRequests.remove(req); 297 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 298 // Remove a complete locate request in a synchronized block, so the table cache must have 299 // quota to send a candidate request. 300 toSend = tableCache.getCandidate(); 301 toSend.ifPresent(r -> tableCache.send(r)); 302 } 303 futureResultList.forEach(RegionLocationsFutureResult::complete); 304 toSend.ifPresent(r -> locateInMeta(tableName, r)); 305 } else { 306 // we meet an error 307 assert error != null; 308 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 309 synchronized (tableCache) { 310 tableCache.pendingRequests.remove(req); 311 // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have 312 // already retried several times 313 CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req); 314 if (future != null) { 315 futureResultList.add(new RegionLocationsFutureResult(future, null, error)); 316 } 317 futureResultList.addAll(tableCache.clearCompletedRequests(null)); 318 // Remove a complete locate request in a synchronized block, so the table cache must have 319 // quota to send a candidate request. 320 toSend = tableCache.getCandidate(); 321 toSend.ifPresent(r -> tableCache.send(r)); 322 } 323 futureResultList.forEach(RegionLocationsFutureResult::complete); 324 toSend.ifPresent(r -> locateInMeta(tableName, r)); 325 } 326 } 327 328 // return whether we should stop the scan 329 private boolean onScanNext(TableName tableName, LocateRequest req, Result result) { 330 RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result); 331 if (LOG.isDebugEnabled()) { 332 LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName, 333 Bytes.toStringBinary(req.row), req.locateType, locs); 334 } 335 // remove HRegionLocation with null location, i.e, getServerName returns null. 336 if (locs != null) { 337 locs = locs.removeElementsWithNullLocation(); 338 } 339 340 // the default region location should always be presented when fetching from meta, otherwise 341 // let's fail the request. 342 if (locs == null || locs.getDefaultRegionLocation() == null) { 343 complete(tableName, req, null, 344 new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s", 345 tableName, Bytes.toStringBinary(req.row), req.locateType))); 346 return true; 347 } 348 HRegionLocation loc = locs.getDefaultRegionLocation(); 349 RegionInfo info = loc.getRegion(); 350 if (info == null) { 351 complete(tableName, req, null, 352 new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", 353 tableName, Bytes.toStringBinary(req.row), req.locateType))); 354 return true; 355 } 356 if (info.isSplitParent()) { 357 return false; 358 } 359 complete(tableName, req, locs, null); 360 return true; 361 } 362 363 private void recordCacheHit() { 364 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit); 365 } 366 367 private void recordCacheMiss() { 368 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss); 369 } 370 371 private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) { 372 RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId); 373 if (locs == null) { 374 recordCacheMiss(); 375 } else { 376 recordCacheHit(); 377 } 378 return locs; 379 } 380 381 private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) { 382 RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId); 383 if (locs == null) { 384 recordCacheMiss(); 385 } else { 386 recordCacheHit(); 387 } 388 return locs; 389 } 390 391 private void locateInMeta(TableName tableName, LocateRequest req) { 392 if (LOG.isTraceEnabled()) { 393 LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) 394 + "', locateType=" + req.locateType + " in meta"); 395 } 396 byte[] metaStartKey; 397 if (req.locateType.equals(RegionLocateType.BEFORE)) { 398 if (isEmptyStopRow(req.row)) { 399 byte[] binaryTableName = tableName.getName(); 400 metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); 401 } else { 402 metaStartKey = createRegionName(tableName, req.row, ZEROES, false); 403 } 404 } else { 405 metaStartKey = createRegionName(tableName, req.row, NINES, false); 406 } 407 byte[] metaStopKey = 408 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 409 Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 410 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) 411 .setReadType(ReadType.PREAD); 412 413 switch (this.metaReplicaMode) { 414 case LOAD_BALANCE: 415 int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType); 416 if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { 417 // If the selector gives a non-primary meta replica region, then go with it. 418 // Otherwise, just go to primary in non-hedgedRead mode. 419 scan.setConsistency(Consistency.TIMELINE); 420 scan.setReplicaId(metaReplicaId); 421 } 422 break; 423 case HEDGED_READ: 424 scan.setConsistency(Consistency.TIMELINE); 425 break; 426 default: 427 // do nothing 428 } 429 430 conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() { 431 432 private boolean completeNormally = false; 433 434 private boolean tableNotFound = true; 435 436 @Override 437 public void onError(Throwable error) { 438 complete(tableName, req, null, error); 439 } 440 441 @Override 442 public void onComplete() { 443 if (tableNotFound) { 444 complete(tableName, req, null, new TableNotFoundException(tableName)); 445 } else if (!completeNormally) { 446 complete(tableName, req, null, new IOException( 447 "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName)); 448 } 449 } 450 451 @Override 452 public void onNext(Result[] results, ScanController controller) { 453 if (results.length == 0) { 454 return; 455 } 456 tableNotFound = false; 457 int i = 0; 458 for (; i < results.length; i++) { 459 if (onScanNext(tableName, req, results[i])) { 460 completeNormally = true; 461 controller.terminate(); 462 i++; 463 break; 464 } 465 } 466 // Add the remaining results into cache 467 if (i < results.length) { 468 TableCache tableCache = getTableCache(tableName); 469 for (; i < results.length; i++) { 470 RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]); 471 if (locs == null) { 472 continue; 473 } 474 HRegionLocation loc = locs.getDefaultRegionLocation(); 475 if (loc == null) { 476 continue; 477 } 478 RegionInfo info = loc.getRegion(); 479 if (info == null || info.isOffline() || info.isSplitParent()) { 480 continue; 481 } 482 RegionLocations addedLocs = tableCache.regionLocationCache.add(locs); 483 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 484 synchronized (tableCache) { 485 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 486 } 487 futureResultList.forEach(RegionLocationsFutureResult::complete); 488 } 489 } 490 } 491 }); 492 } 493 494 private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId, 495 RegionLocateType locateType) { 496 return locateType.equals(RegionLocateType.BEFORE) 497 ? locateRowBeforeInCache(tableCache, row, replicaId) 498 : locateRowInCache(tableCache, row, replicaId); 499 } 500 501 // locateToPrevious is true means we will use the start key of a region to locate the region 502 // placed before it. Used for reverse scan. See the comment of 503 // AsyncRegionLocator.getPreviousRegionLocation. 504 private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName, 505 byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { 506 // AFTER should be convert to CURRENT before calling this method 507 assert !locateType.equals(RegionLocateType.AFTER); 508 TableCache tableCache = getTableCache(tableName); 509 if (!reload) { 510 RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType); 511 if (isGood(locs, replicaId)) { 512 return CompletableFuture.completedFuture(locs); 513 } 514 } 515 CompletableFuture<RegionLocations> future; 516 LocateRequest req; 517 boolean sendRequest = false; 518 synchronized (tableCache) { 519 // check again 520 if (!reload) { 521 RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType); 522 if (isGood(locs, replicaId)) { 523 return CompletableFuture.completedFuture(locs); 524 } 525 } 526 req = new LocateRequest(row, locateType); 527 future = tableCache.allRequests.get(req); 528 if (future == null) { 529 future = new CompletableFuture<>(); 530 tableCache.allRequests.put(req, future); 531 if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { 532 tableCache.send(req); 533 sendRequest = true; 534 } 535 } 536 } 537 if (sendRequest) { 538 locateInMeta(tableName, req); 539 } 540 return future; 541 } 542 543 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 544 int replicaId, RegionLocateType locateType, boolean reload) { 545 // as we know the exact row after us, so we can just create the new row, and use the same 546 // algorithm to locate it. 547 if (locateType.equals(RegionLocateType.AFTER)) { 548 row = createClosestRowAfter(row); 549 locateType = RegionLocateType.CURRENT; 550 } 551 return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); 552 } 553 554 private void recordClearRegionCache() { 555 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion); 556 } 557 558 private void removeLocationFromCache(HRegionLocation loc) { 559 TableCache tableCache = cache.get(loc.getRegion().getTable()); 560 if (tableCache != null) { 561 if (tableCache.regionLocationCache.remove(loc)) { 562 recordClearRegionCache(); 563 updateMetaReplicaSelector(loc); 564 } 565 } 566 } 567 568 private void updateMetaReplicaSelector(HRegionLocation loc) { 569 // Tell metaReplicaSelector that the location is stale. It will create a stale entry 570 // with timestamp internally. Next time the client looks up the same location, 571 // it will pick a different meta replica region. 572 if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 573 metaReplicaSelector.onError(loc); 574 } 575 } 576 577 void addLocationToCache(HRegionLocation loc) { 578 getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc)); 579 } 580 581 private HRegionLocation getCachedLocation(HRegionLocation loc) { 582 TableCache tableCache = cache.get(loc.getRegion().getTable()); 583 if (tableCache == null) { 584 return null; 585 } 586 RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey()); 587 return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; 588 } 589 590 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 591 Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics(); 592 AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, 593 this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null)); 594 } 595 596 void clearCache(TableName tableName) { 597 TableCache tableCache = cache.remove(tableName); 598 if (tableCache == null) { 599 return; 600 } 601 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 602 synchronized (tableCache) { 603 if (!tableCache.allRequests.isEmpty()) { 604 IOException error = new IOException("Cache cleared"); 605 tableCache.allRequests.values().forEach(f -> { 606 futureResultList.add(new RegionLocationsFutureResult(f, null, error)); 607 }); 608 } 609 } 610 futureResultList.forEach(RegionLocationsFutureResult::complete); 611 conn.getConnectionMetrics().ifPresent( 612 metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size())); 613 } 614 615 void clearCache() { 616 cache.clear(); 617 } 618 619 void clearCache(ServerName serverName) { 620 for (TableCache tableCache : cache.values()) { 621 tableCache.regionLocationCache.removeForServer(serverName); 622 } 623 } 624 625 // only used for testing whether we have cached the location for a region. 626 RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { 627 TableCache tableCache = cache.get(tableName); 628 if (tableCache == null) { 629 return null; 630 } 631 return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 632 } 633 634 // only used for testing whether we have cached the location for a table. 635 int getNumberOfCachedRegionLocations(TableName tableName) { 636 TableCache tableCache = cache.get(tableName); 637 if (tableCache == null) { 638 return 0; 639 } 640 return tableCache.regionLocationCache.getAll().stream() 641 .mapToInt(RegionLocations::numNonNullElements).sum(); 642 } 643 644 private TimerTask getInvalidateMetaCacheTask(long metaCacheInvalidateInterval) { 645 return new TimerTask() { 646 @Override 647 public void run(Timeout timeout) throws Exception { 648 FutureUtils.addListener(invalidateTableCache(), (res, err) -> { 649 if (err != null) { 650 LOG.warn("InvalidateTableCache failed.", err); 651 } 652 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, metaCacheInvalidateInterval, 653 TimeUnit.MILLISECONDS); 654 }); 655 } 656 }; 657 } 658 659 private CompletableFuture<Void> invalidateTableCache() { 660 CompletableFuture<Void> future = new CompletableFuture<>(); 661 Iterator<TableName> tbnIter = cache.keySet().iterator(); 662 AsyncAdmin admin = conn.getAdmin(); 663 invalidateCache(future, tbnIter, admin); 664 return future; 665 } 666 667 private void invalidateCache(CompletableFuture<Void> future, Iterator<TableName> tbnIter, 668 AsyncAdmin admin) { 669 if (tbnIter.hasNext()) { 670 TableName tableName = tbnIter.next(); 671 FutureUtils.addListener(admin.isTableDisabled(tableName), (tableDisabled, err) -> { 672 boolean shouldInvalidateCache = false; 673 if (err != null) { 674 if (err instanceof TableNotFoundException) { 675 LOG.info("Table {} was not exist, will invalidate its cache.", tableName); 676 shouldInvalidateCache = true; 677 } else { 678 // If other exception occurred, just skip to invalidate it cache. 679 LOG.warn("Get table state of {} failed, skip to invalidate its cache.", tableName, err); 680 return; 681 } 682 } else if (tableDisabled) { 683 LOG.info("Table {} was disabled, will invalidate its cache.", tableName); 684 shouldInvalidateCache = true; 685 } 686 if (shouldInvalidateCache) { 687 clearCache(tableName); 688 LOG.info("Invalidate cache for {} succeed.", tableName); 689 } else { 690 LOG.debug("Table {} is normal, no need to invalidate its cache.", tableName); 691 } 692 invalidateCache(future, tbnIter, admin); 693 }); 694 } else { 695 future.complete(null); 696 } 697 } 698}