001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 021import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; 022import static org.apache.hadoop.hbase.HConstants.NINES; 023import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 024import static org.apache.hadoop.hbase.HConstants.ZEROES; 025import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 026import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; 027import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; 028import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; 029import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; 030import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; 031import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 032import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; 033import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 034import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; 035import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 036 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.HashSet; 041import java.util.Iterator; 042import java.util.LinkedHashMap; 043import java.util.List; 044import java.util.Map; 045import java.util.Optional; 046import java.util.Set; 047import java.util.concurrent.CompletableFuture; 048import java.util.concurrent.ConcurrentHashMap; 049import java.util.concurrent.ConcurrentMap; 050import java.util.concurrent.ConcurrentNavigableMap; 051import java.util.concurrent.ConcurrentSkipListMap; 052import java.util.concurrent.TimeUnit; 053import org.apache.commons.lang3.ObjectUtils; 054import org.apache.hadoop.hbase.CatalogFamilyFormat; 055import org.apache.hadoop.hbase.CatalogReplicaMode; 056import org.apache.hadoop.hbase.HBaseIOException; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.HRegionLocation; 059import org.apache.hadoop.hbase.RegionLocations; 060import org.apache.hadoop.hbase.ServerName; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.TableNotFoundException; 063import org.apache.hadoop.hbase.client.Scan.ReadType; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.base.Objects; 070 071/** 072 * The asynchronous locator for regions other than meta. 073 */ 074@InterfaceAudience.Private 075class AsyncNonMetaRegionLocator { 076 077 private static final Logger LOG = LoggerFactory.getLogger(AsyncNonMetaRegionLocator.class); 078 079 static final String MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 080 "hbase.client.meta.max.concurrent.locate.per.table"; 081 082 private static final int DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE = 8; 083 084 static String LOCATE_PREFETCH_LIMIT = "hbase.client.locate.prefetch.limit"; 085 086 private static final int DEFAULT_LOCATE_PREFETCH_LIMIT = 10; 087 088 private final AsyncConnectionImpl conn; 089 090 private final int maxConcurrentLocateRequestPerTable; 091 092 private final int locatePrefetchLimit; 093 094 // The mode tells if HedgedRead, LoadBalance mode is supported. 095 // The default mode is CatalogReplicaMode.None. 096 private CatalogReplicaMode metaReplicaMode; 097 private CatalogReplicaLoadBalanceSelector metaReplicaSelector; 098 099 private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>(); 100 101 private static final class LocateRequest { 102 103 private final byte[] row; 104 105 private final RegionLocateType locateType; 106 107 public LocateRequest(byte[] row, RegionLocateType locateType) { 108 this.row = row; 109 this.locateType = locateType; 110 } 111 112 @Override 113 public int hashCode() { 114 return Bytes.hashCode(row) ^ locateType.hashCode(); 115 } 116 117 @Override 118 public boolean equals(Object obj) { 119 if (obj == null || obj.getClass() != LocateRequest.class) { 120 return false; 121 } 122 LocateRequest that = (LocateRequest) obj; 123 return locateType.equals(that.locateType) && Bytes.equals(row, that.row); 124 } 125 } 126 127 private static final class RegionLocationsFutureResult { 128 private final CompletableFuture<RegionLocations> future; 129 private final RegionLocations result; 130 private final Throwable e; 131 132 public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future, 133 RegionLocations result, Throwable e) { 134 this.future = future; 135 this.result = result; 136 this.e = e; 137 } 138 139 public void complete() { 140 if (e != null) { 141 future.completeExceptionally(e); 142 } 143 future.complete(result); 144 } 145 } 146 147 private static final class TableCache { 148 149 private final ConcurrentNavigableMap<byte[], RegionLocations> cache = 150 new ConcurrentSkipListMap<>(BYTES_COMPARATOR); 151 152 private final Set<LocateRequest> pendingRequests = new HashSet<>(); 153 154 private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests = 155 new LinkedHashMap<>(); 156 157 public boolean hasQuota(int max) { 158 return pendingRequests.size() < max; 159 } 160 161 public boolean isPending(LocateRequest req) { 162 return pendingRequests.contains(req); 163 } 164 165 public void send(LocateRequest req) { 166 pendingRequests.add(req); 167 } 168 169 public Optional<LocateRequest> getCandidate() { 170 return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); 171 } 172 173 public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) { 174 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 175 for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter = 176 allRequests.entrySet().iterator(); iter.hasNext();) { 177 Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next(); 178 if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) { 179 iter.remove(); 180 } 181 } 182 return futureResultList; 183 } 184 185 private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future, 186 RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) { 187 if (future.isDone()) { 188 return true; 189 } 190 if (locations == null) { 191 return false; 192 } 193 HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations()); 194 // we should at least have one location available, otherwise the request should fail and 195 // should not arrive here 196 assert loc != null; 197 boolean completed; 198 if (req.locateType.equals(RegionLocateType.BEFORE)) { 199 // for locating the row before current row, the common case is to find the previous region 200 // in reverse scan, so we check the endKey first. In general, the condition should be 201 // startKey < req.row and endKey >= req.row. Here we split it to endKey == req.row || 202 // (endKey > req.row && startKey < req.row). The two conditions are equal since startKey < 203 // endKey. 204 byte[] endKey = loc.getRegion().getEndKey(); 205 int c = Bytes.compareTo(endKey, req.row); 206 completed = c == 0 || ((c > 0 || Bytes.equals(EMPTY_END_ROW, endKey)) 207 && Bytes.compareTo(loc.getRegion().getStartKey(), req.row) < 0); 208 } else { 209 completed = loc.getRegion().containsRow(req.row); 210 } 211 if (completed) { 212 futureResultList.add(new RegionLocationsFutureResult(future, locations, null)); 213 return true; 214 } else { 215 return false; 216 } 217 } 218 } 219 220 AsyncNonMetaRegionLocator(AsyncConnectionImpl conn) { 221 this.conn = conn; 222 this.maxConcurrentLocateRequestPerTable = conn.getConfiguration().getInt( 223 MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); 224 this.locatePrefetchLimit = 225 conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); 226 227 // Get the region locator's meta replica mode. 228 this.metaReplicaMode = CatalogReplicaMode.fromString( 229 conn.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 230 231 switch (this.metaReplicaMode) { 232 case LOAD_BALANCE: 233 String replicaSelectorClass = 234 conn.getConfiguration().get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, 235 CatalogReplicaLoadBalanceSimpleSelector.class.getName()); 236 237 this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory 238 .createSelector(replicaSelectorClass, META_TABLE_NAME, conn, () -> { 239 int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS; 240 try { 241 RegionLocations metaLocations = conn.registry.getMetaRegionLocations() 242 .get(conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS); 243 numOfReplicas = metaLocations.size(); 244 } catch (Exception e) { 245 LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); 246 } 247 return numOfReplicas; 248 }); 249 break; 250 case NONE: 251 // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. 252 boolean useMetaReplicas = 253 conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); 254 if (useMetaReplicas) { 255 this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; 256 } 257 break; 258 default: 259 // Doing nothing 260 } 261 } 262 263 private TableCache getTableCache(TableName tableName) { 264 return computeIfAbsent(cache, tableName, TableCache::new); 265 } 266 267 private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { 268 HRegionLocation[] locArr1 = locs1.getRegionLocations(); 269 HRegionLocation[] locArr2 = locs2.getRegionLocations(); 270 if (locArr1.length != locArr2.length) { 271 return false; 272 } 273 for (int i = 0; i < locArr1.length; i++) { 274 // do not need to compare region info 275 HRegionLocation loc1 = locArr1[i]; 276 HRegionLocation loc2 = locArr2[i]; 277 if (loc1 == null) { 278 if (loc2 != null) { 279 return false; 280 } 281 } else { 282 if (loc2 == null) { 283 return false; 284 } 285 if (loc1.getSeqNum() != loc2.getSeqNum()) { 286 return false; 287 } 288 if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) { 289 return false; 290 } 291 } 292 } 293 return true; 294 } 295 296 // if we successfully add the locations to cache, return the locations, otherwise return the one 297 // which prevents us being added. The upper layer can use this value to complete pending requests. 298 private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) { 299 LOG.trace("Try adding {} to cache", locs); 300 byte[] startKey = locs.getRegionLocation().getRegion().getStartKey(); 301 for (;;) { 302 RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs); 303 if (oldLocs == null) { 304 return locs; 305 } 306 // check whether the regions are the same, this usually happens when table is split/merged, or 307 // deleted and recreated again. 308 RegionInfo region = locs.getRegionLocation().getRegion(); 309 RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion(); 310 if (region.getEncodedName().equals(oldRegion.getEncodedName())) { 311 RegionLocations mergedLocs = oldLocs.mergeLocations(locs); 312 if (isEqual(mergedLocs, oldLocs)) { 313 // the merged one is the same with the old one, give up 314 LOG.trace("Will not add {} to cache because the old value {} " 315 + " is newer than us or has the same server name." 316 + " Maybe it is updated before we replace it", locs, oldLocs); 317 return oldLocs; 318 } 319 if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { 320 return mergedLocs; 321 } 322 } else { 323 // the region is different, here we trust the one we fetched. This maybe wrong but finally 324 // the upper layer can detect this and trigger removal of the wrong locations 325 if (LOG.isDebugEnabled()) { 326 LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}'," 327 + " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey)); 328 } 329 if (tableCache.cache.replace(startKey, oldLocs, locs)) { 330 return locs; 331 } 332 } 333 } 334 } 335 336 private void complete(TableName tableName, LocateRequest req, RegionLocations locs, 337 Throwable error) { 338 if (error != null) { 339 LOG.warn("Failed to locate region in '" + tableName + "', row='" 340 + Bytes.toStringBinary(req.row) + "', locateType=" + req.locateType, error); 341 } 342 Optional<LocateRequest> toSend = Optional.empty(); 343 TableCache tableCache = getTableCache(tableName); 344 if (locs != null) { 345 RegionLocations addedLocs = addToCache(tableCache, locs); 346 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 347 synchronized (tableCache) { 348 tableCache.pendingRequests.remove(req); 349 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 350 // Remove a complete locate request in a synchronized block, so the table cache must have 351 // quota to send a candidate request. 352 toSend = tableCache.getCandidate(); 353 toSend.ifPresent(r -> tableCache.send(r)); 354 } 355 futureResultList.forEach(RegionLocationsFutureResult::complete); 356 toSend.ifPresent(r -> locateInMeta(tableName, r)); 357 } else { 358 // we meet an error 359 assert error != null; 360 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 361 synchronized (tableCache) { 362 tableCache.pendingRequests.remove(req); 363 // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have 364 // already retried several times 365 CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req); 366 if (future != null) { 367 futureResultList.add(new RegionLocationsFutureResult(future, null, error)); 368 } 369 futureResultList.addAll(tableCache.clearCompletedRequests(null)); 370 // Remove a complete locate request in a synchronized block, so the table cache must have 371 // quota to send a candidate request. 372 toSend = tableCache.getCandidate(); 373 toSend.ifPresent(r -> tableCache.send(r)); 374 } 375 futureResultList.forEach(RegionLocationsFutureResult::complete); 376 toSend.ifPresent(r -> locateInMeta(tableName, r)); 377 } 378 } 379 380 // return whether we should stop the scan 381 private boolean onScanNext(TableName tableName, LocateRequest req, Result result) { 382 RegionLocations locs = CatalogFamilyFormat.getRegionLocations(result); 383 if (LOG.isDebugEnabled()) { 384 LOG.debug("The fetched location of '{}', row='{}', locateType={} is {}", tableName, 385 Bytes.toStringBinary(req.row), req.locateType, locs); 386 } 387 // remove HRegionLocation with null location, i.e, getServerName returns null. 388 if (locs != null) { 389 locs = locs.removeElementsWithNullLocation(); 390 } 391 392 // the default region location should always be presented when fetching from meta, otherwise 393 // let's fail the request. 394 if (locs == null || locs.getDefaultRegionLocation() == null) { 395 complete(tableName, req, null, 396 new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s", 397 tableName, Bytes.toStringBinary(req.row), req.locateType))); 398 return true; 399 } 400 HRegionLocation loc = locs.getDefaultRegionLocation(); 401 RegionInfo info = loc.getRegion(); 402 if (info == null) { 403 complete(tableName, req, null, 404 new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", 405 tableName, Bytes.toStringBinary(req.row), req.locateType))); 406 return true; 407 } 408 if (info.isSplitParent()) { 409 return false; 410 } 411 complete(tableName, req, locs, null); 412 return true; 413 } 414 415 private void recordCacheHit() { 416 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit); 417 } 418 419 private void recordCacheMiss() { 420 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss); 421 } 422 423 private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row, 424 int replicaId) { 425 Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row); 426 if (entry == null) { 427 recordCacheMiss(); 428 return null; 429 } 430 RegionLocations locs = entry.getValue(); 431 HRegionLocation loc = locs.getRegionLocation(replicaId); 432 if (loc == null) { 433 recordCacheMiss(); 434 return null; 435 } 436 byte[] endKey = loc.getRegion().getEndKey(); 437 if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { 438 if (LOG.isTraceEnabled()) { 439 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 440 Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); 441 } 442 recordCacheHit(); 443 return locs; 444 } else { 445 recordCacheMiss(); 446 return null; 447 } 448 } 449 450 private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName, 451 byte[] row, int replicaId) { 452 boolean isEmptyStopRow = isEmptyStopRow(row); 453 Map.Entry<byte[], RegionLocations> entry = 454 isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); 455 if (entry == null) { 456 recordCacheMiss(); 457 return null; 458 } 459 RegionLocations locs = entry.getValue(); 460 HRegionLocation loc = locs.getRegionLocation(replicaId); 461 if (loc == null) { 462 recordCacheMiss(); 463 return null; 464 } 465 if ( 466 isEmptyStopRow(loc.getRegion().getEndKey()) 467 || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0) 468 ) { 469 if (LOG.isTraceEnabled()) { 470 LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, 471 Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); 472 } 473 recordCacheHit(); 474 return locs; 475 } else { 476 recordCacheMiss(); 477 return null; 478 } 479 } 480 481 private void locateInMeta(TableName tableName, LocateRequest req) { 482 if (LOG.isTraceEnabled()) { 483 LOG.trace("Try locate '" + tableName + "', row='" + Bytes.toStringBinary(req.row) 484 + "', locateType=" + req.locateType + " in meta"); 485 } 486 byte[] metaStartKey; 487 if (req.locateType.equals(RegionLocateType.BEFORE)) { 488 if (isEmptyStopRow(req.row)) { 489 byte[] binaryTableName = tableName.getName(); 490 metaStartKey = Arrays.copyOf(binaryTableName, binaryTableName.length + 1); 491 } else { 492 metaStartKey = createRegionName(tableName, req.row, ZEROES, false); 493 } 494 } else { 495 metaStartKey = createRegionName(tableName, req.row, NINES, false); 496 } 497 byte[] metaStopKey = 498 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 499 Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 500 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) 501 .setReadType(ReadType.PREAD); 502 503 switch (this.metaReplicaMode) { 504 case LOAD_BALANCE: 505 int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType); 506 if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { 507 // If the selector gives a non-primary meta replica region, then go with it. 508 // Otherwise, just go to primary in non-hedgedRead mode. 509 scan.setConsistency(Consistency.TIMELINE); 510 scan.setReplicaId(metaReplicaId); 511 } 512 break; 513 case HEDGED_READ: 514 scan.setConsistency(Consistency.TIMELINE); 515 break; 516 default: 517 // do nothing 518 } 519 520 conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() { 521 522 private boolean completeNormally = false; 523 524 private boolean tableNotFound = true; 525 526 @Override 527 public void onError(Throwable error) { 528 complete(tableName, req, null, error); 529 } 530 531 @Override 532 public void onComplete() { 533 if (tableNotFound) { 534 complete(tableName, req, null, new TableNotFoundException(tableName)); 535 } else if (!completeNormally) { 536 complete(tableName, req, null, new IOException( 537 "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName)); 538 } 539 } 540 541 @Override 542 public void onNext(Result[] results, ScanController controller) { 543 if (results.length == 0) { 544 return; 545 } 546 tableNotFound = false; 547 int i = 0; 548 for (; i < results.length; i++) { 549 if (onScanNext(tableName, req, results[i])) { 550 completeNormally = true; 551 controller.terminate(); 552 i++; 553 break; 554 } 555 } 556 // Add the remaining results into cache 557 if (i < results.length) { 558 TableCache tableCache = getTableCache(tableName); 559 for (; i < results.length; i++) { 560 RegionLocations locs = CatalogFamilyFormat.getRegionLocations(results[i]); 561 if (locs == null) { 562 continue; 563 } 564 HRegionLocation loc = locs.getDefaultRegionLocation(); 565 if (loc == null) { 566 continue; 567 } 568 RegionInfo info = loc.getRegion(); 569 if (info == null || info.isOffline() || info.isSplitParent()) { 570 continue; 571 } 572 RegionLocations addedLocs = addToCache(tableCache, locs); 573 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 574 synchronized (tableCache) { 575 futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); 576 } 577 futureResultList.forEach(RegionLocationsFutureResult::complete); 578 } 579 } 580 } 581 }); 582 } 583 584 private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row, 585 int replicaId, RegionLocateType locateType) { 586 return locateType.equals(RegionLocateType.BEFORE) 587 ? locateRowBeforeInCache(tableCache, tableName, row, replicaId) 588 : locateRowInCache(tableCache, tableName, row, replicaId); 589 } 590 591 // locateToPrevious is true means we will use the start key of a region to locate the region 592 // placed before it. Used for reverse scan. See the comment of 593 // AsyncRegionLocator.getPreviousRegionLocation. 594 private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName tableName, 595 byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { 596 // AFTER should be convert to CURRENT before calling this method 597 assert !locateType.equals(RegionLocateType.AFTER); 598 TableCache tableCache = getTableCache(tableName); 599 if (!reload) { 600 RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); 601 if (isGood(locs, replicaId)) { 602 return CompletableFuture.completedFuture(locs); 603 } 604 } 605 CompletableFuture<RegionLocations> future; 606 LocateRequest req; 607 boolean sendRequest = false; 608 synchronized (tableCache) { 609 // check again 610 if (!reload) { 611 RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); 612 if (isGood(locs, replicaId)) { 613 return CompletableFuture.completedFuture(locs); 614 } 615 } 616 req = new LocateRequest(row, locateType); 617 future = tableCache.allRequests.get(req); 618 if (future == null) { 619 future = new CompletableFuture<>(); 620 tableCache.allRequests.put(req, future); 621 if (tableCache.hasQuota(maxConcurrentLocateRequestPerTable) && !tableCache.isPending(req)) { 622 tableCache.send(req); 623 sendRequest = true; 624 } 625 } 626 } 627 if (sendRequest) { 628 locateInMeta(tableName, req); 629 } 630 return future; 631 } 632 633 CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, 634 int replicaId, RegionLocateType locateType, boolean reload) { 635 // as we know the exact row after us, so we can just create the new row, and use the same 636 // algorithm to locate it. 637 if (locateType.equals(RegionLocateType.AFTER)) { 638 row = createClosestRowAfter(row); 639 locateType = RegionLocateType.CURRENT; 640 } 641 return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); 642 } 643 644 private void recordClearRegionCache() { 645 conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion); 646 } 647 648 private void removeLocationFromCache(HRegionLocation loc) { 649 TableCache tableCache = cache.get(loc.getRegion().getTable()); 650 if (tableCache == null) { 651 return; 652 } 653 byte[] startKey = loc.getRegion().getStartKey(); 654 for (;;) { 655 RegionLocations oldLocs = tableCache.cache.get(startKey); 656 if (oldLocs == null) { 657 return; 658 } 659 HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); 660 if (!canUpdateOnError(loc, oldLoc)) { 661 return; 662 } 663 // Tell metaReplicaSelector that the location is stale. It will create a stale entry 664 // with timestamp internally. Next time the client looks up the same location, 665 // it will pick a different meta replica region. 666 if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 667 metaReplicaSelector.onError(loc); 668 } 669 670 RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); 671 if (newLocs == null) { 672 if (tableCache.cache.remove(startKey, oldLocs)) { 673 recordClearRegionCache(); 674 return; 675 } 676 } else { 677 if (tableCache.cache.replace(startKey, oldLocs, newLocs)) { 678 recordClearRegionCache(); 679 return; 680 } 681 } 682 } 683 } 684 685 void addLocationToCache(HRegionLocation loc) { 686 addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc)); 687 } 688 689 private HRegionLocation getCachedLocation(HRegionLocation loc) { 690 TableCache tableCache = cache.get(loc.getRegion().getTable()); 691 if (tableCache == null) { 692 return null; 693 } 694 RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey()); 695 return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; 696 } 697 698 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { 699 Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics(); 700 AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, 701 this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null)); 702 } 703 704 void clearCache(TableName tableName) { 705 TableCache tableCache = cache.remove(tableName); 706 if (tableCache == null) { 707 return; 708 } 709 List<RegionLocationsFutureResult> futureResultList = new ArrayList<>(); 710 synchronized (tableCache) { 711 if (!tableCache.allRequests.isEmpty()) { 712 IOException error = new IOException("Cache cleared"); 713 tableCache.allRequests.values().forEach(f -> { 714 futureResultList.add(new RegionLocationsFutureResult(f, null, error)); 715 }); 716 } 717 } 718 futureResultList.forEach(RegionLocationsFutureResult::complete); 719 conn.getConnectionMetrics() 720 .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size())); 721 } 722 723 void clearCache() { 724 cache.clear(); 725 } 726 727 void clearCache(ServerName serverName) { 728 for (TableCache tableCache : cache.values()) { 729 for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) { 730 byte[] regionName = entry.getKey(); 731 RegionLocations locs = entry.getValue(); 732 RegionLocations newLocs = locs.removeByServer(serverName); 733 if (locs == newLocs) { 734 continue; 735 } 736 if (newLocs.isEmpty()) { 737 tableCache.cache.remove(regionName, locs); 738 } else { 739 tableCache.cache.replace(regionName, locs, newLocs); 740 } 741 } 742 } 743 } 744 745 // only used for testing whether we have cached the location for a region. 746 RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { 747 TableCache tableCache = cache.get(tableName); 748 if (tableCache == null) { 749 return null; 750 } 751 return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 752 } 753 754 // only used for testing whether we have cached the location for a table. 755 int getNumberOfCachedRegionLocations(TableName tableName) { 756 TableCache tableCache = cache.get(tableName); 757 if (tableCache == null) { 758 return 0; 759 } 760 return tableCache.cache.values().stream().mapToInt(RegionLocations::numNonNullElements).sum(); 761 } 762}