001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.NINES; 023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 024import static org.apache.hadoop.hbase.HConstants.ZEROES; 025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; 027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; 028import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; 029import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; 030import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; 031import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 032import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; 033import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 034import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; 035import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 036 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.HashSet; 041import java.util.Iterator; 042import java.util.LinkedHashMap; 043import java.util.List; 044import java.util.Map; 045import java.util.Optional; 046import java.util.Set; 047import java.util.concurrent.CompletableFuture; 048import java.util.concurrent.ConcurrentHashMap; 049import java.util.concurrent.ConcurrentMap; 050import java.util.concurrent.ConcurrentNavigableMap; 051import java.util.concurrent.ConcurrentSkipListMap; 052import java.util.concurrent.TimeUnit; 053import org.apache.commons.lang3.ObjectUtils; 054import org.apache.hadoop.hbase.CatalogReplicaMode; 055import org.apache.hadoop.hbase.HBaseIOException; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionLocation; 058import org.apache.hadoop.hbase.MetaTableAccessor; 059import org.apache.hadoop.hbase.RegionLocations; 060import org.apache.hadoop.hbase.ServerName; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.TableNotFoundException; 063import org.apache.hadoop.hbase.client.Scan.ReadType; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.base.Objects; 070 071/** 072 * The asynchronous locator for regions other than meta. 073 */ 074@InterfaceAudience.Private 075class AsyncNonMetaRegionLocator { 076 077 private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class); 078 079 static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 080 "hbase.client.meta.max.concurrent.locate.per.table"; 081 082 private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; 083 084 static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit"; 085 086 private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10; 087 088 private final AsyncConnectionImpl conn; 089 090 private final int maxConcurrentLocateRequestPerTable; 091 092 private final int locatePrefetchLimit; 093 094 // The mode tells if HedgedRead, LoadBalance mode is supported. 095 // The default mode is CatalogReplicaMode.None. 096 private CatalogReplicaMode metaReplicaMode; 097 private CatalogReplicaLoadBalanceSelector metaReplicaSelector; 098 099 private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>(); 100 101 private static final class LocateRequest { 102 103 private final byte[] row; 104 105 private final RegionLocateType locateType; 106 107 public LocateRequest(byte[] row, RegionLocateType locateType) { 108 this.row = row; 109 this.locateType = locateType; 110 } 111 112 @Override 113 public int hashCode() { 114 return Bytes.hashCode(row) ^ locateType.hashCode(); 115 } 116 117 @Override 118 public boolean equals(Object obj) { 119 if (obj == null || obj.getClass() != LocateRequest.class) { 120 return false; 121 } 122 LocateRequest that = (LocateRequest) obj; 123 return locateType.equals(that.locateType) && Bytes.equals(row, that.row); 124 } 125 } 126 127 private static final class RegionLocationsFutureResult { 128 private final CompletableFuture<RegionLocations> future; 129 private final RegionLocations result; 130 private final Throwable e; 131 132 public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future, 133 RegionLocations result, Throwable e) { 134 this.future = future; 135 this.result = result; 136 this.e = e; 137 } 138 139 public void complete() { 140 if (e != null) { 141 future.completeExceptionally(e); 142 } 143 future.complete(result); 144 } 145 } 146 147 private static final class TableCache { 148 149 private final ConcurrentNavigableMap<byte[], RegionLocations> cache = 150 new ConcurrentSkipListMap<>(BYTES_COMPARATOR); 151 152 private final Set<LocateRequest> pendingRequests = new HashSet<>(); 153 154 private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests = 155 new LinkedHashMap<>(); 156 157 public boolean hasQuota(int max) { 158 return pendingRequests.size() < max; 159 } 160 161 public boolean isPending(LocateRequest req) { 162 return pendingRequests.contains(req); 163 } 164 165 public void send(LocateRequest req) { 166 pendingRequests.add(req); 167 } 168 169 public Optional<LocateRequest> getCandidate() { 170 return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); 171 } 172 173 public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) { 174 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 175 for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter = 176 allRequests.entrySet().iterator(); iter.hasNext();) { 177 Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next(); 178 if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) { 179 iter.remove(); 180 } 181 } 182 return futureResultList; 183 } 184 185 private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future, 186 RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) { 187 if (future.isDone()) { 188 return true; 189 } 190 if (locations == null) { 191 return false; 192 } 193 HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations()); 194 // we should at least have one location available, otherwise the request should fail and 195 // should not arrive here 196 assert loc != null; 197 boolean completed; 198 if (req.locateType.equals(RegionLocateType.BEFORE)) { 199 // for locating the row before current row, the common case is to find the previous region 200 // in reverse scan, so we check the endKey first. In general, the condition should be 201 // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row || 202 // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey < 203 // endKey. 204 byte[] endKey = loc.getRegion().getEndKey(); 205 int c = Bytes.compareTo(endKey, req.row); 206 completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) 207 && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0); 208 } else { 209 completed = loc.getRegion().containsRow(req.row); 210 } 211 if (completed) { 212 futureResultList.add(new RegionLocationsFutureResult(future, locations, null)); 213 return true; 214 } else { 215 return false; 216 } 217 } 218 } 219 220 AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) { 221 this.conn = conn; 222 this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( 223 MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); 224 this.locatePrefetchLimit = 225 conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); 226 227 // Get the region locator's meta replica mode. 228 this.metaReplicaMode = CatalogReplicaMode.fromString( 229 conn.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 230 231 switch (this.metaReplicaMode) { 232 case LOAD_BALANCE: 233 String replicaSelectorClass = 234 conn.getConfiguration().get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, 235 CatalogReplicaLoadBalanceSimpleSelector.class.getName()); 236 237 this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory 238 .createSelector(replicaSelectorClass, META_TABLE_NAME, conn.getChoreService(), () -> { 239 int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS; 240 try { 241 RegionLocations metaLocations = conn.registry.getMetaRegionLocations() 242 .get(conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS); 243 numOfReplicas = metaLocations.size(); 244 } catch (Exception e) { 245 LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); 246 } 247 return numOfReplicas; 248 }); 249 break; 250 case NONE: 251 // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. 252 253 boolean useMetaReplicas = 254 conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); 255 if (useMetaReplicas) { 256 this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; 257 } 258 break; 259 default: 260 // Doing nothing 261 } 262 } 263 264 private TableCache getTableCache(TableName tableName) { 265 return computeIfAbsent(cache, tableName, TableCache::new); 266 } 267 268 private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { 269 HRegionLocation[] locArr1 = locs1.getRegionLocations(); 270 HRegionLocation[] locArr2 = locs2.getRegionLocations(); 271 if (locArr1.length != locArr2.length) { 272 return false; 273 } 274 for (int i = 0; i < locArr1.length; i++) { 275 // do not need to compare region info 276 HRegionLocation loc1 = locArr1[i]; 277 HRegionLocation loc2 = locArr2[i]; 278 if (loc1 == null) { 279 if (loc2 != null) { 280 return false; 281 } 282 } else { 283 if (loc2 == null) { 284 return false; 285 } 286 if (loc1.getSeqNum() != loc2.getSeqNum()) { 287 return false; 288 } 289 if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) { 290 return false; 291 } 292 } 293 } 294 return true; 295 } 296 297 // if we successfully add the locations to cache, return the locations, otherwise return the one 298 // which prevents us being added. The upper layer can use this value to complete pending requests. 299 private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) { 300 LOG.trace("Try adding {} to cache", locs); 301 byte[] startKey = locs.getRegionLocation().getRegion().getStartKey(); 302 for (;;) { 303 RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs); 304 if (oldLocs == null) { 305 return locs; 306 } 307 // check whether the regions are the same, this usually happens when table is split/merged, or 308 // deleted and recreated again. 309 RegionInfo region = locs.getRegionLocation().getRegion(); 310 RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion(); 311 if (region.getEncodedName().equals(oldRegion.getEncodedName())) { 312 RegionLocations mergedLocs = oldLocs.mergeLocations(locs); 313 if (isEqual(mergedLocs, oldLocs)) { 314 // the merged one is the same with the old one, give up 315 LOG.trace("Will not add {} to cache because the old value {} " 316 + " is newer than us or has the same server name." 317 + " Maybe it is updated before we replace it", locs, oldLocs); 318 return oldLocs; 319 } 320 if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { 321 return mergedLocs; 322 } 323 } else { 324 // the region is different, here we trust the one we fetched. This maybe wrong but finally 325 // the upper layer can detect this and trigger removal of the wrong locations 326 if (LOG.isDebugEnabled()) { 327 LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," 328 + " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey)); 329 } 330 if (tableCache.cache.replace(startKey, oldLocs, locs)) { 331 return locs; 332 } 333 } 334 } 335 } 336 337 private void complete(TableName tableName, LocateRequest req, RegionLocations locs, 338 Throwable error) { 339 if (error != null) { 340 LOG.warn("Failed to locate region in '" + tableName + "', row='" 341 + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error); 342 } 343 Optional<LocateRequest> toSend = Optional.empty(); 344 TableCache tableCache = getTableCache(tableName); 345 if (locs != null) { 346 RegionLocations addedLocs = addToCache(tableCache, locs); 347 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 348 synchronized (tableCache) { 349 tableCache.pendingRequests.remove(req); 350 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 351 // Remove a complete locate request in a synchronized block, so the table cache must have 352 // quota to send a candidate request. 353 toSend = tableCache.getCandidate(); 354 toSend.ifPresent(r -> tableCache.send(r)); 355 } 356 futureResultList.forEach(RegionLocationsFutureResult::complete); 357 toSend.ifPresent(r -> locateInMeta(tableName, r)); 358 } else { 359 // we meet an error 360 assert error != null; 361 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 362 synchronized (tableCache) { 363 tableCache.pendingRequests.remove(req); 364 // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have 365 // already retried several times 366 CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req); 367 if (future != null) { 368 futureResultList.add(new RegionLocationsFutureResult(future, null, error)); 369 } 370 futureResultList.addAll(tableCache.clearCompletedRequests(null)); 371 // Remove a complete locate request in a synchronized block, so the table cache must have 372 // quota to send a candidate request. 373 toSend = tableCache.getCandidate(); 374 toSend.ifPresent(r -> tableCache.send(r)); 375 } 376 futureResultList.forEach(RegionLocationsFutureResult::complete); 377 toSend.ifPresent(r -> locateInMeta(tableName, r)); 378 } 379 } 380 381 // return whether we should stop the scan 382 private boolean onScanNext(TableName tableName, LocateRequest req, Result result) { 383 RegionLocations locs = MetaTableAccessor.getRegionLocations(result); 384 if (LOG.isDebugEnabled()) { 385 LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName, 386 Bytes.toStringBinary(req.row), req.locateType, locs); 387 } 388 // remove HRegionLocation with null location, i.e, getServerName returns null. 389 if (locs != null) { 390 locs = locs.removeElementsWithNullLocation(); 391 } 392 393 // the default region location should always be presented when fetching from meta, otherwise 394 // let's fail the request. 395 if (locs == null || locs.getDefaultRegionLocation() == null) { 396 complete(tableName, req, null, 397 new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s", 398 tableName, Bytes.toStringBinary(req.row), req.locateType))); 399 return true; 400 } 401 HRegionLocation loc = locs.getDefaultRegionLocation(); 402 RegionInfo info = loc.getRegion(); 403 if (info == null) { 404 complete(tableName, req, null, 405 new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", 406 tableName, Bytes.toStringBinary(req.row), req.locateType))); 407 return true; 408 } 409 if (info.isSplitParent()) { 410 return false; 411 } 412 complete(tableName, req, locs, null); 413 return true; 414 } 415 416 private void recordCacheHit() { 417 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit); 418 } 419 420 private void recordCacheMiss() { 421 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss); 422 } 423 424 private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row, 425 int replicaId) { 426 Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row); 427 if (entry == null) { 428 recordCacheMiss(); 429 return null; 430 } 431 RegionLocations locs = entry.getValue(); 432 HRegionLocation loc = locs.getRegionLocation(replicaId); 433 if (loc == null) { 434 recordCacheMiss(); 435 return null; 436 } 437 byte[] endKey = loc.getRegion().getEndKey(); 438 if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { 439 if (LOG.isTraceEnabled()) { 440 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 441 Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); 442 } 443 recordCacheHit(); 444 return locs; 445 } else { 446 recordCacheMiss(); 447 return null; 448 } 449 } 450 451 private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName, 452 byte[] row, int replicaId) { 453 boolean isEmptyStopRow = isEmptyStopRow(row); 454 Map.Entry<byte[], RegionLocations> entry = 455 isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); 456 if (entry == null) { 457 recordCacheMiss(); 458 return null; 459 } 460 RegionLocations locs = entry.getValue(); 461 HRegionLocation loc = locs.getRegionLocation(replicaId); 462 if (loc == null) { 463 recordCacheMiss(); 464 return null; 465 } 466 if ( 467 isEmptyStopRow(loc.getRegion().getEndKey()) 468 || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0) 469 ) { 470 if (LOG.isTraceEnabled()) { 471 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 472 Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); 473 } 474 recordCacheHit(); 475 return locs; 476 } else { 477 recordCacheMiss(); 478 return null; 479 } 480 } 481 482 private void locateInMeta(TableName tableName, LocateRequest req) { 483 if (LOG.isTraceEnabled()) { 484 LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) 485 + "', locateType=" + req.locateType + " in meta"); 486 } 487 byte[] metaStartKey; 488 if (req.locateType.equals(RegionLocateType.BEFORE)) { 489 if (isEmptyStopRow(req.row)) { 490 byte[] binaryTableName = tableName.getName(); 491 metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); 492 } else { 493 metaStartKey = createRegionName(tableName, req.row, ZEROES, false); 494 } 495 } else { 496 metaStartKey = createRegionName(tableName, req.row, NINES, false); 497 } 498 byte[] metaStopKey = 499 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 500 Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 501 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) 502 .setReadType(ReadType.PREAD); 503 504 switch (this.metaReplicaMode) { 505 case LOAD_BALANCE: 506 int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType); 507 if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { 508 // If the selector gives a non-primary meta replica region, then go with it. 509 // Otherwise, just go to primary in non-hedgedRead mode. 510 scan.setConsistency(Consistency.TIMELINE); 511 scan.setReplicaId(metaReplicaId); 512 } 513 break; 514 case HEDGED_READ: 515 scan.setConsistency(Consistency.TIMELINE); 516 break; 517 default: 518 // do nothing 519 } 520 521 conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() { 522 523 private boolean completeNormally = false; 524 525 private boolean tableNotFound = true; 526 527 @Override 528 public void onError(Throwable error) { 529 complete(tableName, req, null, error); 530 } 531 532 @Override 533 public void onComplete() { 534 if (tableNotFound) { 535 complete(tableName, req, null, new TableNotFoundException(tableName)); 536 } else if (!completeNormally) { 537 complete(tableName, req, null, new IOException( 538 "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName)); 539 } 540 } 541 542 @Override 543 public void onNext(Result[] results, ScanController controller) { 544 if (results.length == 0) { 545 return; 546 } 547 tableNotFound = false; 548 int i = 0; 549 for (; i < results.length; i++) { 550 if (onScanNext(tableName, req, results[i])) { 551 completeNormally = true; 552 controller.terminate(); 553 i++; 554 break; 555 } 556 } 557 // Add the remaining results into cache 558 if (i < results.length) { 559 TableCache tableCache = getTableCache(tableName); 560 for (; i < results.length; i++) { 561 RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]); 562 if (locs == null) { 563 continue; 564 } 565 HRegionLocation loc = locs.getDefaultRegionLocation(); 566 if (loc == null) { 567 continue; 568 } 569 RegionInfo info = loc.getRegion(); 570 if (info == null || info.isOffline() || info.isSplitParent()) { 571 continue; 572 } 573 RegionLocations addedLocs = addToCache(tableCache, locs); 574 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 575 synchronized (tableCache) { 576 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 577 } 578 futureResultList.forEach(RegionLocationsFutureResult::complete); 579 } 580 } 581 } 582 }); 583 } 584 585 private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row, 586 int replicaId, RegionLocateType locateType) { 587 return locateType.equals(RegionLocateType.BEFORE) 588 ? locateRowBeforeInCache(tableCache, tableName, row, replicaId) 589 : locateRowInCache(tableCache, tableName, row, replicaId); 590 } 591 592 // locateToPrevious is true means we will use the start key of a region to locate the region 593 // placed before it. Used for reverse scan. See the comment of 594 // AsyncRegionLocator.getPreviousRegionLocation. 595 private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName, 596 byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { 597 // AFTER should be convert to CURRENT before calling this method 598 assert !locateType.equals(RegionLocateType.AFTER); 599 TableCache tableCache = getTableCache(tableName); 600 if (!reload) { 601 RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); 602 if (isGood(locs, replicaId)) { 603 return CompletableFuture.completedFuture(locs); 604 } 605 } 606 CompletableFuture<RegionLocations> future; 607 LocateRequest req; 608 boolean sendRequest = false; 609 synchronized (tableCache) { 610 // check again 611 if (!reload) { 612 RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); 613 if (isGood(locs, replicaId)) { 614 return CompletableFuture.completedFuture(locs); 615 } 616 } 617 req = new LocateRequest(row, locateType); 618 future = tableCache.allRequests.get(req); 619 if (future == null) { 620 future = new CompletableFuture<>(); 621 tableCache.allRequests.put(req, future); 622 if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { 623 tableCache.send(req); 624 sendRequest = true; 625 } 626 } 627 } 628 if (sendRequest) { 629 locateInMeta(tableName, req); 630 } 631 return future; 632 } 633 634 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 635 int replicaId, RegionLocateType locateType, boolean reload) { 636 // as we know the exact row after us, so we can just create the new row, and use the same 637 // algorithm to locate it. 638 if (locateType.equals(RegionLocateType.AFTER)) { 639 row = createClosestRowAfter(row); 640 locateType = RegionLocateType.CURRENT; 641 } 642 return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); 643 } 644 645 private void recordClearRegionCache() { 646 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion); 647 } 648 649 private void removeLocationFromCache(HRegionLocation loc) { 650 TableCache tableCache = cache.get(loc.getRegion().getTable()); 651 if (tableCache == null) { 652 return; 653 } 654 byte[] startKey = loc.getRegion().getStartKey(); 655 for (;;) { 656 RegionLocations oldLocs = tableCache.cache.get(startKey); 657 if (oldLocs == null) { 658 return; 659 } 660 HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); 661 if (!canUpdateOnError(loc, oldLoc)) { 662 return; 663 } 664 // Tell metaReplicaSelector that the location is stale. It will create a stale entry 665 // with timestamp internally. Next time the client looks up the same location, 666 // it will pick a different meta replica region. 667 if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 668 metaReplicaSelector.onError(loc); 669 } 670 671 RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); 672 if (newLocs == null) { 673 if (tableCache.cache.remove(startKey, oldLocs)) { 674 recordClearRegionCache(); 675 return; 676 } 677 } else { 678 if (tableCache.cache.replace(startKey, oldLocs, newLocs)) { 679 recordClearRegionCache(); 680 return; 681 } 682 } 683 } 684 } 685 686 void addLocationToCache(HRegionLocation loc) { 687 addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc)); 688 } 689 690 private HRegionLocation getCachedLocation(HRegionLocation loc) { 691 TableCache tableCache = cache.get(loc.getRegion().getTable()); 692 if (tableCache == null) { 693 return null; 694 } 695 RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey()); 696 return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; 697 } 698 699 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 700 Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics(); 701 AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, 702 this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null)); 703 } 704 705 void clearCache(TableName tableName) { 706 TableCache tableCache = cache.remove(tableName); 707 if (tableCache == null) { 708 return; 709 } 710 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 711 synchronized (tableCache) { 712 if (!tableCache.allRequests.isEmpty()) { 713 IOException error = new IOException("Cache cleared"); 714 tableCache.allRequests.values().forEach(f -> { 715 futureResultList.add(new RegionLocationsFutureResult(f, null, error)); 716 }); 717 } 718 } 719 futureResultList.forEach(RegionLocationsFutureResult::complete); 720 conn.getConnectionMetrics() 721 .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size())); 722 } 723 724 void clearCache() { 725 cache.clear(); 726 } 727 728 void clearCache(ServerName serverName) { 729 for (TableCache tableCache : cache.values()) { 730 for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) { 731 byte[] regionName = entry.getKey(); 732 RegionLocations locs = entry.getValue(); 733 RegionLocations newLocs = locs.removeByServer(serverName); 734 if (locs == newLocs) { 735 continue; 736 } 737 if (newLocs.isEmpty()) { 738 tableCache.cache.remove(regionName, locs); 739 } else { 740 tableCache.cache.replace(regionName, locs, newLocs); 741 } 742 } 743 } 744 } 745 746 // only used for testing whether we have cached the location for a region. 747 RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { 748 TableCache tableCache = cache.get(tableName); 749 if (tableCache == null) { 750 return null; 751 } 752 return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 753 } 754}