001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.NINES; 023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 024import static org.apache.hadoop.hbase.HConstants.ZEROES; 025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; 027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; 028import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; 029import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; 030import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; 031import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 032import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; 033import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; 034import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 035 036import java.io.IOException; 037import java.util.Arrays; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.LinkedHashMap; 041import java.util.Map; 042import java.util.Optional; 043import java.util.Set; 044import java.util.concurrent.CompletableFuture; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.ConcurrentNavigableMap; 048import java.util.concurrent.ConcurrentSkipListMap; 049import org.apache.commons.lang3.ObjectUtils; 050import org.apache.hadoop.hbase.HBaseIOException; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.MetaTableAccessor; 054import org.apache.hadoop.hbase.RegionLocations; 055import org.apache.hadoop.hbase.ServerName; 056import org.apache.hadoop.hbase.TableName; 057import org.apache.hadoop.hbase.TableNotFoundException; 058import org.apache.hadoop.hbase.client.Scan.ReadType; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 065import org.apache.hbase.thirdparty.com.google.common.base.Objects; 066 067/** 068 * The asynchronous locator for regions other than meta. 069 */ 070@InterfaceAudience.Private 071class AsyncNonMetaRegionLocator { 072 073 private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class); 074 075 @VisibleForTesting 076 static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 077 "hbase.client.meta.max.concurrent.locate.per.table"; 078 079 private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; 080 081 @VisibleForTesting 082 static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit"; 083 084 private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10; 085 086 private final AsyncConnectionImpl conn; 087 088 private final int maxConcurrentLocateRequestPerTable; 089 090 private final int locatePrefetchLimit; 091 092 private final boolean useMetaReplicas; 093 094 private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>(); 095 096 private static final class LocateRequest { 097 098 private final byte[] row; 099 100 private final RegionLocateType locateType; 101 102 public LocateRequest(byte[] row, RegionLocateType locateType) { 103 this.row = row; 104 this.locateType = locateType; 105 } 106 107 @Override 108 public int hashCode() { 109 return Bytes.hashCode(row) ^ locateType.hashCode(); 110 } 111 112 @Override 113 public boolean equals(Object obj) { 114 if (obj == null || obj.getClass() != LocateRequest.class) { 115 return false; 116 } 117 LocateRequest that = (LocateRequest) obj; 118 return locateType.equals(that.locateType) && Bytes.equals(row, that.row); 119 } 120 } 121 122 private static final class TableCache { 123 124 private final ConcurrentNavigableMap<byte[], RegionLocations> cache = 125 new ConcurrentSkipListMap<>(BYTES_COMPARATOR); 126 127 private final Set<LocateRequest> pendingRequests = new HashSet<>(); 128 129 private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests = 130 new LinkedHashMap<>(); 131 132 public boolean hasQuota(int max) { 133 return pendingRequests.size() < max; 134 } 135 136 public boolean isPending(LocateRequest req) { 137 return pendingRequests.contains(req); 138 } 139 140 public void send(LocateRequest req) { 141 pendingRequests.add(req); 142 } 143 144 public Optional<LocateRequest> getCandidate() { 145 return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); 146 } 147 148 public void clearCompletedRequests(RegionLocations locations) { 149 for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter = 150 allRequests.entrySet().iterator(); iter.hasNext();) { 151 Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next(); 152 if (tryComplete(entry.getKey(), entry.getValue(), locations)) { 153 iter.remove(); 154 } 155 } 156 } 157 158 private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future, 159 RegionLocations locations) { 160 if (future.isDone()) { 161 return true; 162 } 163 if (locations == null) { 164 return false; 165 } 166 HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations()); 167 // we should at least have one location available, otherwise the request should fail and 168 // should not arrive here 169 assert loc != null; 170 boolean completed; 171 if (req.locateType.equals(RegionLocateType.BEFORE)) { 172 // for locating the row before current row, the common case is to find the previous region 173 // in reverse scan, so we check the endKey first. In general, the condition should be 174 // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row || 175 // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey < 176 // endKey. 177 byte[] endKey = loc.getRegion().getEndKey(); 178 int c = Bytes.compareTo(endKey, req.row); 179 completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) && 180 Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0); 181 } else { 182 completed = loc.getRegion().containsRow(req.row); 183 } 184 if (completed) { 185 future.complete(locations); 186 return true; 187 } else { 188 return false; 189 } 190 } 191 } 192 193 AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) { 194 this.conn = conn; 195 this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( 196 MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); 197 this.locatePrefetchLimit = 198 conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); 199 this.useMetaReplicas = 200 conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); 201 } 202 203 private TableCache getTableCache(TableName tableName) { 204 return computeIfAbsent(cache, tableName, TableCache::new); 205 } 206 207 private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { 208 HRegionLocation[] locArr1 = locs1.getRegionLocations(); 209 HRegionLocation[] locArr2 = locs2.getRegionLocations(); 210 if (locArr1.length != locArr2.length) { 211 return false; 212 } 213 for (int i = 0; i < locArr1.length; i++) { 214 // do not need to compare region info 215 HRegionLocation loc1 = locArr1[i]; 216 HRegionLocation loc2 = locArr2[i]; 217 if (loc1 == null) { 218 if (loc2 != null) { 219 return false; 220 } 221 } else { 222 if (loc2 == null) { 223 return false; 224 } 225 if (loc1.getSeqNum() != loc2.getSeqNum()) { 226 return false; 227 } 228 if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) { 229 return false; 230 } 231 } 232 } 233 return true; 234 } 235 236 // if we successfully add the locations to cache, return the locations, otherwise return the one 237 // which prevents us being added. The upper layer can use this value to complete pending requests. 238 private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) { 239 LOG.trace("Try adding {} to cache", locs); 240 byte[] startKey = locs.getRegionLocation().getRegion().getStartKey(); 241 for (;;) { 242 RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs); 243 if (oldLocs == null) { 244 return locs; 245 } 246 // check whether the regions are the same, this usually happens when table is split/merged, or 247 // deleted and recreated again. 248 RegionInfo region = locs.getRegionLocation().getRegion(); 249 RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion(); 250 if (region.getEncodedName().equals(oldRegion.getEncodedName())) { 251 RegionLocations mergedLocs = oldLocs.mergeLocations(locs); 252 if (isEqual(mergedLocs, oldLocs)) { 253 // the merged one is the same with the old one, give up 254 LOG.trace("Will not add {} to cache because the old value {} " + 255 " is newer than us or has the same server name." + 256 " Maybe it is updated before we replace it", locs, oldLocs); 257 return oldLocs; 258 } 259 if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { 260 return mergedLocs; 261 } 262 } else { 263 // the region is different, here we trust the one we fetched. This maybe wrong but finally 264 // the upper layer can detect this and trigger removal of the wrong locations 265 if (LOG.isDebugEnabled()) { 266 LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," + 267 " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey)); 268 } 269 if (tableCache.cache.replace(startKey, oldLocs, locs)) { 270 return locs; 271 } 272 } 273 } 274 } 275 276 private void complete(TableName tableName, LocateRequest req, RegionLocations locs, 277 Throwable error) { 278 if (error != null) { 279 LOG.warn("Failed to locate region in '" + tableName + "', row='" + 280 Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error); 281 } 282 Optional<LocateRequest> toSend = Optional.empty(); 283 TableCache tableCache = getTableCache(tableName); 284 if (locs != null) { 285 RegionLocations addedLocs = addToCache(tableCache, locs); 286 synchronized (tableCache) { 287 tableCache.pendingRequests.remove(req); 288 tableCache.clearCompletedRequests(addedLocs); 289 // Remove a complete locate request in a synchronized block, so the table cache must have 290 // quota to send a candidate request. 291 toSend = tableCache.getCandidate(); 292 toSend.ifPresent(r -> tableCache.send(r)); 293 } 294 toSend.ifPresent(r -> locateInMeta(tableName, r)); 295 } else { 296 // we meet an error 297 assert error != null; 298 synchronized (tableCache) { 299 tableCache.pendingRequests.remove(req); 300 // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have 301 // already retried several times 302 CompletableFuture<?> future = tableCache.allRequests.remove(req); 303 if (future != null) { 304 future.completeExceptionally(error); 305 } 306 tableCache.clearCompletedRequests(null); 307 // Remove a complete locate request in a synchronized block, so the table cache must have 308 // quota to send a candidate request. 309 toSend = tableCache.getCandidate(); 310 toSend.ifPresent(r -> tableCache.send(r)); 311 } 312 toSend.ifPresent(r -> locateInMeta(tableName, r)); 313 } 314 } 315 316 // return whether we should stop the scan 317 private boolean onScanNext(TableName tableName, LocateRequest req, Result result) { 318 RegionLocations locs = MetaTableAccessor.getRegionLocations(result); 319 if (LOG.isDebugEnabled()) { 320 LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName, 321 Bytes.toStringBinary(req.row), req.locateType, locs); 322 } 323 // remove HRegionLocation with null location, i.e, getServerName returns null. 324 if (locs != null) { 325 locs = locs.removeElementsWithNullLocation(); 326 } 327 328 // the default region location should always be presented when fetching from meta, otherwise 329 // let's fail the request. 330 if (locs == null || locs.getDefaultRegionLocation() == null) { 331 complete(tableName, req, null, 332 new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s", 333 tableName, Bytes.toStringBinary(req.row), req.locateType))); 334 return true; 335 } 336 HRegionLocation loc = locs.getDefaultRegionLocation(); 337 RegionInfo info = loc.getRegion(); 338 if (info == null) { 339 complete(tableName, req, null, 340 new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", 341 tableName, Bytes.toStringBinary(req.row), req.locateType))); 342 return true; 343 } 344 if (info.isSplitParent()) { 345 return false; 346 } 347 complete(tableName, req, locs, null); 348 return true; 349 } 350 351 private void recordCacheHit() { 352 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit); 353 } 354 355 private void recordCacheMiss() { 356 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss); 357 } 358 359 private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row, 360 int replicaId) { 361 Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row); 362 if (entry == null) { 363 recordCacheMiss(); 364 return null; 365 } 366 RegionLocations locs = entry.getValue(); 367 HRegionLocation loc = locs.getRegionLocation(replicaId); 368 if (loc == null) { 369 recordCacheMiss(); 370 return null; 371 } 372 byte[] endKey = loc.getRegion().getEndKey(); 373 if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { 374 if (LOG.isTraceEnabled()) { 375 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 376 Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); 377 } 378 recordCacheHit(); 379 return locs; 380 } else { 381 recordCacheMiss(); 382 return null; 383 } 384 } 385 386 private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName, 387 byte[] row, int replicaId) { 388 boolean isEmptyStopRow = isEmptyStopRow(row); 389 Map.Entry<byte[], RegionLocations> entry = 390 isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); 391 if (entry == null) { 392 recordCacheMiss(); 393 return null; 394 } 395 RegionLocations locs = entry.getValue(); 396 HRegionLocation loc = locs.getRegionLocation(replicaId); 397 if (loc == null) { 398 recordCacheMiss(); 399 return null; 400 } 401 if (isEmptyStopRow(loc.getRegion().getEndKey()) || 402 (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) { 403 if (LOG.isTraceEnabled()) { 404 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 405 Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); 406 } 407 recordCacheHit(); 408 return locs; 409 } else { 410 recordCacheMiss(); 411 return null; 412 } 413 } 414 415 private void locateInMeta(TableName tableName, LocateRequest req) { 416 if (LOG.isTraceEnabled()) { 417 LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) + 418 "', locateType=" + req.locateType + " in meta"); 419 } 420 byte[] metaStartKey; 421 if (req.locateType.equals(RegionLocateType.BEFORE)) { 422 if (isEmptyStopRow(req.row)) { 423 byte[] binaryTableName = tableName.getName(); 424 metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); 425 } else { 426 metaStartKey = createRegionName(tableName, req.row, ZEROES, false); 427 } 428 } else { 429 metaStartKey = createRegionName(tableName, req.row, NINES, false); 430 } 431 byte[] metaStopKey = 432 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 433 Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 434 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) 435 .setReadType(ReadType.PREAD); 436 if (useMetaReplicas) { 437 scan.setConsistency(Consistency.TIMELINE); 438 } 439 conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() { 440 441 private boolean completeNormally = false; 442 443 private boolean tableNotFound = true; 444 445 @Override 446 public void onError(Throwable error) { 447 complete(tableName, req, null, error); 448 } 449 450 @Override 451 public void onComplete() { 452 if (tableNotFound) { 453 complete(tableName, req, null, new TableNotFoundException(tableName)); 454 } else if (!completeNormally) { 455 complete(tableName, req, null, new IOException( 456 "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName)); 457 } 458 } 459 460 @Override 461 public void onNext(Result[] results, ScanController controller) { 462 if (results.length == 0) { 463 return; 464 } 465 tableNotFound = false; 466 int i = 0; 467 for (; i < results.length; i++) { 468 if (onScanNext(tableName, req, results[i])) { 469 completeNormally = true; 470 controller.terminate(); 471 i++; 472 break; 473 } 474 } 475 // Add the remaining results into cache 476 if (i < results.length) { 477 TableCache tableCache = getTableCache(tableName); 478 for (; i < results.length; i++) { 479 RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]); 480 if (locs == null) { 481 continue; 482 } 483 HRegionLocation loc = locs.getDefaultRegionLocation(); 484 if (loc == null) { 485 continue; 486 } 487 RegionInfo info = loc.getRegion(); 488 if (info == null || info.isOffline() || info.isSplitParent()) { 489 continue; 490 } 491 RegionLocations addedLocs = addToCache(tableCache, locs); 492 synchronized (tableCache) { 493 tableCache.clearCompletedRequests(addedLocs); 494 } 495 } 496 } 497 } 498 }); 499 } 500 501 private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row, 502 int replicaId, RegionLocateType locateType) { 503 return locateType.equals(RegionLocateType.BEFORE) 504 ? locateRowBeforeInCache(tableCache, tableName, row, replicaId) 505 : locateRowInCache(tableCache, tableName, row, replicaId); 506 } 507 508 // locateToPrevious is true means we will use the start key of a region to locate the region 509 // placed before it. Used for reverse scan. See the comment of 510 // AsyncRegionLocator.getPreviousRegionLocation. 511 private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName, 512 byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { 513 // AFTER should be convert to CURRENT before calling this method 514 assert !locateType.equals(RegionLocateType.AFTER); 515 TableCache tableCache = getTableCache(tableName); 516 if (!reload) { 517 RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); 518 if (isGood(locs, replicaId)) { 519 return CompletableFuture.completedFuture(locs); 520 } 521 } 522 CompletableFuture<RegionLocations> future; 523 LocateRequest req; 524 boolean sendRequest = false; 525 synchronized (tableCache) { 526 // check again 527 if (!reload) { 528 RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); 529 if (isGood(locs, replicaId)) { 530 return CompletableFuture.completedFuture(locs); 531 } 532 } 533 req = new LocateRequest(row, locateType); 534 future = tableCache.allRequests.get(req); 535 if (future == null) { 536 future = new CompletableFuture<>(); 537 tableCache.allRequests.put(req, future); 538 if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { 539 tableCache.send(req); 540 sendRequest = true; 541 } 542 } 543 } 544 if (sendRequest) { 545 locateInMeta(tableName, req); 546 } 547 return future; 548 } 549 550 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 551 int replicaId, RegionLocateType locateType, boolean reload) { 552 // as we know the exact row after us, so we can just create the new row, and use the same 553 // algorithm to locate it. 554 if (locateType.equals(RegionLocateType.AFTER)) { 555 row = createClosestRowAfter(row); 556 locateType = RegionLocateType.CURRENT; 557 } 558 return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); 559 } 560 561 private void recordClearRegionCache() { 562 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion); 563 } 564 565 private void removeLocationFromCache(HRegionLocation loc) { 566 TableCache tableCache = cache.get(loc.getRegion().getTable()); 567 if (tableCache == null) { 568 return; 569 } 570 byte[] startKey = loc.getRegion().getStartKey(); 571 for (;;) { 572 RegionLocations oldLocs = tableCache.cache.get(startKey); 573 if (oldLocs == null) { 574 return; 575 } 576 HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); 577 if (!canUpdateOnError(loc, oldLoc)) { 578 return; 579 } 580 RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); 581 if (newLocs == null) { 582 if (tableCache.cache.remove(startKey, oldLocs)) { 583 recordClearRegionCache(); 584 return; 585 } 586 } else { 587 if (tableCache.cache.replace(startKey, oldLocs, newLocs)) { 588 recordClearRegionCache(); 589 return; 590 } 591 } 592 } 593 } 594 595 private void addLocationToCache(HRegionLocation loc) { 596 addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc)); 597 } 598 599 private HRegionLocation getCachedLocation(HRegionLocation loc) { 600 TableCache tableCache = cache.get(loc.getRegion().getTable()); 601 if (tableCache == null) { 602 return null; 603 } 604 RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey()); 605 return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; 606 } 607 608 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 609 Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics(); 610 AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, 611 this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null)); 612 } 613 614 void clearCache(TableName tableName) { 615 TableCache tableCache = cache.remove(tableName); 616 if (tableCache == null) { 617 return; 618 } 619 synchronized (tableCache) { 620 if (!tableCache.allRequests.isEmpty()) { 621 IOException error = new IOException("Cache cleared"); 622 tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error)); 623 } 624 } 625 conn.getConnectionMetrics() 626 .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size())); 627 } 628 629 void clearCache() { 630 cache.clear(); 631 } 632 633 void clearCache(ServerName serverName) { 634 for (TableCache tableCache : cache.values()) { 635 for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) { 636 byte[] regionName = entry.getKey(); 637 RegionLocations locs = entry.getValue(); 638 RegionLocations newLocs = locs.removeByServer(serverName); 639 if (locs == newLocs) { 640 continue; 641 } 642 if (newLocs.isEmpty()) { 643 tableCache.cache.remove(regionName, locs); 644 } else { 645 tableCache.cache.replace(regionName, locs, newLocs); 646 } 647 } 648 } 649 } 650 651 // only used for testing whether we have cached the location for a region. 652 @VisibleForTesting 653 RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { 654 TableCache tableCache = cache.get(tableName); 655 if (tableCache == null) { 656 return null; 657 } 658 return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 659 } 660}