001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.stream.Collectors; 032import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 033import org.apache.hadoop.hbase.client.AsyncTable; 034import org.apache.hadoop.hbase.client.Consistency; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Scan.ReadType; 040import org.apache.hadoop.hbase.client.TableState; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.Pair; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * The (asynchronous) meta table accessor used at client side. Used to read/write region and 049 * assignment information store in <code>hbase:meta</code>. 050 * @since 2.0.0 051 * @see CatalogFamilyFormat 052 */ 053@InterfaceAudience.Private 054public final class ClientMetaTableAccessor { 055 056 private static final Logger LOG = LoggerFactory.getLogger(ClientMetaTableAccessor.class); 057 058 private ClientMetaTableAccessor() { 059 } 060 061 @InterfaceAudience.Private 062 @SuppressWarnings("ImmutableEnumChecker") 063 public enum QueryType { 064 ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY), 065 REGION(HConstants.CATALOG_FAMILY), 066 TABLE(HConstants.TABLE_FAMILY), 067 REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY); 068 069 private final byte[][] families; 070 071 QueryType(byte[]... families) { 072 this.families = families; 073 } 074 075 byte[][] getFamilies() { 076 return this.families; 077 } 078 } 079 080 public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable, 081 TableName tableName) { 082 return getTableState(metaTable, tableName).thenApply(Optional::isPresent); 083 } 084 085 public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable, 086 TableName tableName) { 087 CompletableFuture<Optional<TableState>> future = new CompletableFuture<>(); 088 Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY, 089 HConstants.TABLE_STATE_QUALIFIER); 090 addListener(metaTable.get(get), (result, error) -> { 091 if (error != null) { 092 future.completeExceptionally(error); 093 return; 094 } 095 try { 096 future.complete(getTableState(result)); 097 } catch (IOException e) { 098 future.completeExceptionally(e); 099 } 100 }); 101 return future; 102 } 103 104 /** Returns the HRegionLocation from meta for the given region */ 105 public static CompletableFuture<Optional<HRegionLocation>> 106 getRegionLocation(AsyncTable<?> metaTable, byte[] regionName) { 107 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); 108 try { 109 RegionInfo parsedRegionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); 110 addListener(metaTable.get(new Get(CatalogFamilyFormat.getMetaKeyForRegion(parsedRegionInfo)) 111 .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> { 112 if (err != null) { 113 future.completeExceptionally(err); 114 return; 115 } 116 future.complete(getRegionLocations(r) 117 .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId()))); 118 }); 119 } catch (IOException parseEx) { 120 LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName)); 121 future.completeExceptionally(parseEx); 122 } 123 return future; 124 } 125 126 /** Returns the HRegionLocation from meta for the given encoded region name */ 127 public static CompletableFuture<Optional<HRegionLocation>> 128 getRegionLocationWithEncodedName(AsyncTable<?> metaTable, byte[] encodedRegionName) { 129 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); 130 addListener( 131 metaTable 132 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)), 133 (results, err) -> { 134 if (err != null) { 135 future.completeExceptionally(err); 136 return; 137 } 138 String encodedRegionNameStr = Bytes.toString(encodedRegionName); 139 results.stream().filter(result -> !result.isEmpty()) 140 .filter(result -> CatalogFamilyFormat.getRegionInfo(result) != null).forEach(result -> { 141 getRegionLocations(result).ifPresent(locations -> { 142 for (HRegionLocation location : locations.getRegionLocations()) { 143 if ( 144 location != null 145 && encodedRegionNameStr.equals(location.getRegion().getEncodedName()) 146 ) { 147 future.complete(Optional.of(location)); 148 return; 149 } 150 } 151 }); 152 }); 153 future.complete(Optional.empty()); 154 }); 155 return future; 156 } 157 158 private static Optional<TableState> getTableState(Result r) throws IOException { 159 return Optional.ofNullable(CatalogFamilyFormat.getTableState(r)); 160 } 161 162 /** 163 * Used to get all region locations for the specific table 164 * @param metaTable scanner over meta table 165 * @param tableName table we're looking for, can be null for getting all regions 166 * @return the list of region locations. The return value will be wrapped by a 167 * {@link CompletableFuture}. 168 */ 169 public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations( 170 AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) { 171 return toHRegionLocations(getTableRegionsAndLocations(metaTable, tableName, true)); 172 } 173 174 /** 175 * Used to get a single-RPC, paginated slice of region locations for the specific table, starting 176 * at the meta row derived from {@code startKey} and capped at {@code rowLimit} regions. 177 * {@code startKey} must be a region start-key boundary (e.g. the end key of the previously 178 * visited region), or {@code null}/empty to start at the first region. 179 * @param metaTable scanner over meta table 180 * @param tableName table we're looking for 181 * @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty 182 * starts from the first region 183 * @param rowLimit maximum number of meta rows to return; if {@code <= 0}, the underlying scan is 184 * unbounded 185 * @return the list of region locations. The return value will be wrapped by a 186 * {@link CompletableFuture}. 187 */ 188 public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations( 189 AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName, byte[] startKey, 190 int rowLimit) { 191 return toHRegionLocations( 192 getTableRegionsAndLocations(metaTable, tableName, true, startKey, rowLimit)); 193 } 194 195 private static CompletableFuture<List<HRegionLocation>> 196 toHRegionLocations(CompletableFuture<List<Pair<RegionInfo, ServerName>>> source) { 197 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 198 addListener(source, (locations, err) -> { 199 if (err != null) { 200 future.completeExceptionally(err); 201 } else if (locations == null || locations.isEmpty()) { 202 future.complete(Collections.emptyList()); 203 } else { 204 List<HRegionLocation> regionLocations = 205 locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) 206 .collect(Collectors.toList()); 207 future.complete(regionLocations); 208 } 209 }); 210 return future; 211 } 212 213 /** 214 * Used to get table regions' info and server. 215 * @param metaTable scanner over meta table 216 * @param tableName table we're looking for, can be null for getting all regions 217 * @param excludeOfflinedSplitParents don't return split parents 218 * @return the list of regioninfos and server. The return value will be wrapped by a 219 * {@link CompletableFuture}. 220 */ 221 private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations( 222 final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName, 223 final boolean excludeOfflinedSplitParents) { 224 CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>(); 225 if (TableName.META_TABLE_NAME.equals(tableName)) { 226 future.completeExceptionally(new IOException( 227 "This method can't be used to locate meta regions;" + " use MetaTableLocator instead")); 228 } 229 230 // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress 231 CollectRegionLocationsVisitor visitor = 232 new CollectRegionLocationsVisitor(excludeOfflinedSplitParents); 233 234 addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> { 235 if (error != null) { 236 future.completeExceptionally(error); 237 return; 238 } 239 future.complete(visitor.getResults()); 240 }); 241 return future; 242 } 243 244 /** 245 * Variant of {@link #getTableRegionsAndLocations} that scans a bounded slice of meta starting at 246 * the row derived from {@code startKey} and stopping after at most {@code rowLimit} rows. 247 */ 248 private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations( 249 final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName, 250 final boolean excludeOfflinedSplitParents, final byte[] startKey, final int rowLimit) { 251 CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>(); 252 if (TableName.META_TABLE_NAME.equals(tableName)) { 253 future.completeExceptionally(new IOException( 254 "This method can't be used to locate meta regions;" + " use MetaTableLocator instead")); 255 return future; 256 } 257 258 CollectRegionLocationsVisitor visitor = 259 new CollectRegionLocationsVisitor(excludeOfflinedSplitParents); 260 261 byte[] metaStart = (startKey == null || startKey.length == 0) 262 ? getTableStartRowForMeta(tableName, QueryType.REGION) 263 : RegionInfo.createRegionName(tableName, startKey, HConstants.ZEROES, false); 264 byte[] metaStop = getTableStopRowForMeta(tableName, QueryType.REGION); 265 266 addListener(scanMeta(metaTable, metaStart, metaStop, QueryType.REGION, rowLimit, true, visitor), 267 (v, error) -> { 268 if (error != null) { 269 future.completeExceptionally(error); 270 return; 271 } 272 future.complete(visitor.getResults()); 273 }); 274 return future; 275 } 276 277 /** 278 * Performs a scan of META table for given table. 279 * @param metaTable scanner over meta table 280 * @param tableName table within we scan 281 * @param type scanned part of meta 282 * @param visitor Visitor invoked against each row 283 */ 284 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, 285 TableName tableName, QueryType type, final Visitor visitor) { 286 return scanMeta(metaTable, getTableStartRowForMeta(tableName, type), 287 getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, false, visitor); 288 } 289 290 /** 291 * Performs a scan of META table for given table. 292 * @param metaTable scanner over meta table 293 * @param startRow Where to start the scan 294 * @param stopRow Where to stop the scan 295 * @param type scanned part of meta 296 * @param maxRows maximum rows to return 297 * @param isPagedScan when {@code true}, the scan is sized so the whole slice (up to 298 * {@code maxRows}) returns in a single ScannerNext RPC. When {@code false}, 299 * uses the configured {@code hbase.meta.scanner.caching}. 300 * @param visitor Visitor invoked against each row 301 */ 302 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, 303 byte[] startRow, byte[] stopRow, QueryType type, int maxRows, boolean isPagedScan, 304 final Visitor visitor) { 305 int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; 306 Scan scan = getMetaScan(metaTable, rowUpperLimit, isPagedScan); 307 for (byte[] family : type.getFamilies()) { 308 scan.addFamily(family); 309 } 310 if (startRow != null) { 311 scan.withStartRow(startRow); 312 } 313 if (stopRow != null) { 314 scan.withStopRow(stopRow); 315 } 316 317 if (LOG.isDebugEnabled()) { 318 LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow()) 319 + " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max=" 320 + rowUpperLimit + " with caching=" + scan.getCaching()); 321 } 322 323 CompletableFuture<Void> future = new CompletableFuture<Void>(); 324 // Get the region locator's meta replica mode. 325 CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration() 326 .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 327 328 if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 329 addListener(metaTable.getDescriptor(), (desc, error) -> { 330 if (error != null) { 331 LOG.error("Failed to get meta table descriptor, error: ", error); 332 future.completeExceptionally(error); 333 return; 334 } 335 336 int numOfReplicas = desc.getRegionReplication(); 337 if (numOfReplicas > 1) { 338 int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas); 339 340 // When the replicaId is 0, do not set to Consistency.TIMELINE 341 if (replicaId > 0) { 342 scan.setReplicaId(replicaId); 343 scan.setConsistency(Consistency.TIMELINE); 344 } 345 } 346 metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future)); 347 }); 348 } else { 349 if (metaReplicaMode == CatalogReplicaMode.HEDGED_READ) { 350 scan.setConsistency(Consistency.TIMELINE); 351 } 352 metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future)); 353 } 354 355 return future; 356 } 357 358 private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer { 359 360 private int currentRowCount; 361 362 private final int rowUpperLimit; 363 364 private final Visitor visitor; 365 366 private final CompletableFuture<Void> future; 367 368 MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor, 369 CompletableFuture<Void> future) { 370 this.rowUpperLimit = rowUpperLimit; 371 this.visitor = visitor; 372 this.future = future; 373 this.currentRowCount = 0; 374 } 375 376 @Override 377 public void onError(Throwable error) { 378 future.completeExceptionally(error); 379 } 380 381 @Override 382 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION", 383 justification = "https://github.com/findbugsproject/findbugs/issues/79") 384 public void onComplete() { 385 future.complete(null); 386 } 387 388 @Override 389 public void onNext(Result[] results, ScanController controller) { 390 boolean terminateScan = false; 391 for (Result result : results) { 392 try { 393 if (!visitor.visit(result)) { 394 terminateScan = true; 395 break; 396 } 397 } catch (Exception e) { 398 future.completeExceptionally(e); 399 terminateScan = true; 400 break; 401 } 402 if (++currentRowCount >= rowUpperLimit) { 403 terminateScan = true; 404 break; 405 } 406 } 407 if (terminateScan) { 408 controller.terminate(); 409 } 410 } 411 } 412 413 /** 414 * Implementations 'visit' a catalog table row. 415 */ 416 public interface Visitor { 417 /** 418 * Visit the catalog table row. 419 * @param r A row from catalog table 420 * @return True if we are to proceed scanning the table, else false if we are to stop now. 421 */ 422 boolean visit(final Result r) throws IOException; 423 } 424 425 /** 426 * Implementations 'visit' a catalog table row but with close() at the end. 427 */ 428 public interface CloseableVisitor extends Visitor, Closeable { 429 } 430 431 /** 432 * A {@link Visitor} that collects content out of passed {@link Result}. 433 */ 434 private static abstract class CollectingVisitor<T> implements Visitor { 435 final List<T> results = new ArrayList<>(); 436 437 @Override 438 public boolean visit(Result r) throws IOException { 439 if (r != null && !r.isEmpty()) { 440 add(r); 441 } 442 return true; 443 } 444 445 abstract void add(Result r); 446 447 /** Returns Collected results; wait till visits complete to collect all possible results */ 448 List<T> getResults() { 449 return this.results; 450 } 451 } 452 453 static class CollectRegionLocationsVisitor 454 extends CollectingVisitor<Pair<RegionInfo, ServerName>> { 455 456 private final boolean excludeOfflinedSplitParents; 457 458 private RegionLocations current = null; 459 460 CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents) { 461 this.excludeOfflinedSplitParents = excludeOfflinedSplitParents; 462 } 463 464 @Override 465 public boolean visit(Result r) throws IOException { 466 Optional<RegionLocations> currentRegionLocations = getRegionLocations(r); 467 current = currentRegionLocations.orElse(null); 468 if (current == null || current.getRegionLocation().getRegion() == null) { 469 LOG.warn("No serialized RegionInfo in " + r); 470 return true; 471 } 472 RegionInfo hri = current.getRegionLocation().getRegion(); 473 if (excludeOfflinedSplitParents && hri.isSplitParent()) { 474 return true; 475 } 476 // Else call super and add this Result to the collection. 477 return super.visit(r); 478 } 479 480 @Override 481 void add(Result r) { 482 if (current == null) { 483 return; 484 } 485 for (HRegionLocation loc : current.getRegionLocations()) { 486 if (loc != null) { 487 this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc.getServerName())); 488 } 489 } 490 } 491 } 492 493 /** 494 * Collects all returned. 495 */ 496 static class CollectAllVisitor extends CollectingVisitor<Result> { 497 @Override 498 void add(Result r) { 499 this.results.add(r); 500 } 501 } 502 503 private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit, boolean isPagedScan) { 504 Scan scan = new Scan(); 505 int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING, 506 HConstants.DEFAULT_HBASE_META_SCANNER_CACHING); 507 if ( 508 metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS, 509 HConstants.DEFAULT_USE_META_REPLICAS) 510 ) { 511 scan.setConsistency(Consistency.TIMELINE); 512 } 513 if (isPagedScan) { 514 // Caller is doing a bounded paged scan and expects the whole slice back in one ScannerNext 515 // RPC. Size caching to the slice. Trade-off: a single larger response uses more RegionServer 516 // heap, fine for meta rows (small). 517 scan.setLimit(rowUpperLimit); 518 scan.setCaching(rowUpperLimit); 519 } else { 520 if (rowUpperLimit <= scannerCaching) { 521 scan.setLimit(rowUpperLimit); 522 } 523 scan.setCaching(Math.min(rowUpperLimit, scannerCaching)); 524 } 525 return scan; 526 } 527 528 /** Returns an HRegionLocationList extracted from the result. */ 529 private static Optional<RegionLocations> getRegionLocations(Result r) { 530 return Optional.ofNullable(CatalogFamilyFormat.getRegionLocations(r)); 531 } 532 533 /** Returns start row for scanning META according to query type */ 534 public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) { 535 if (tableName == null) { 536 return null; 537 } 538 switch (type) { 539 case REGION: 540 case REPLICATION: { 541 byte[] startRow = new byte[tableName.getName().length + 2]; 542 System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length); 543 startRow[startRow.length - 2] = HConstants.DELIMITER; 544 startRow[startRow.length - 1] = HConstants.DELIMITER; 545 return startRow; 546 } 547 case ALL: 548 case TABLE: 549 default: { 550 return tableName.getName(); 551 } 552 } 553 } 554 555 /** Returns stop row for scanning META according to query type */ 556 public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) { 557 if (tableName == null) { 558 return null; 559 } 560 final byte[] stopRow; 561 switch (type) { 562 case REGION: 563 case REPLICATION: { 564 stopRow = new byte[tableName.getName().length + 3]; 565 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); 566 stopRow[stopRow.length - 3] = ' '; 567 stopRow[stopRow.length - 2] = HConstants.DELIMITER; 568 stopRow[stopRow.length - 1] = HConstants.DELIMITER; 569 break; 570 } 571 case ALL: 572 case TABLE: 573 default: { 574 stopRow = new byte[tableName.getName().length + 1]; 575 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); 576 stopRow[stopRow.length - 1] = ' '; 577 break; 578 } 579 } 580 return stopRow; 581 } 582}