001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.io.Closeable; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.stream.Collectors; 032import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; 033import org.apache.hadoop.hbase.client.AsyncTable; 034import org.apache.hadoop.hbase.client.Consistency; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Scan.ReadType; 040import org.apache.hadoop.hbase.client.TableState; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.Pair; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * The (asynchronous) meta table accessor used at client side. Used to read/write region and 049 * assignment information store in <code>hbase:meta</code>. 050 * @since 2.0.0 051 * @see CatalogFamilyFormat 052 */ 053@InterfaceAudience.Private 054public final class ClientMetaTableAccessor { 055 056 private static final Logger LOG = LoggerFactory.getLogger(ClientMetaTableAccessor.class); 057 058 private ClientMetaTableAccessor() { 059 } 060 061 @InterfaceAudience.Private 062 @SuppressWarnings("ImmutableEnumChecker") 063 public enum QueryType { 064 ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY), 065 REGION(HConstants.CATALOG_FAMILY), 066 TABLE(HConstants.TABLE_FAMILY), 067 REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY); 068 069 private final byte[][] families; 070 071 QueryType(byte[]... families) { 072 this.families = families; 073 } 074 075 byte[][] getFamilies() { 076 return this.families; 077 } 078 } 079 080 public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable, 081 TableName tableName) { 082 return getTableState(metaTable, tableName).thenApply(Optional::isPresent); 083 } 084 085 public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable, 086 TableName tableName) { 087 CompletableFuture<Optional<TableState>> future = new CompletableFuture<>(); 088 Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY, 089 HConstants.TABLE_STATE_QUALIFIER); 090 addListener(metaTable.get(get), (result, error) -> { 091 if (error != null) { 092 future.completeExceptionally(error); 093 return; 094 } 095 try { 096 future.complete(getTableState(result)); 097 } catch (IOException e) { 098 future.completeExceptionally(e); 099 } 100 }); 101 return future; 102 } 103 104 /** Returns the HRegionLocation from meta for the given region */ 105 public static CompletableFuture<Optional<HRegionLocation>> 106 getRegionLocation(AsyncTable<?> metaTable, byte[] regionName) { 107 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); 108 try { 109 RegionInfo parsedRegionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName); 110 addListener(metaTable.get(new Get(CatalogFamilyFormat.getMetaKeyForRegion(parsedRegionInfo)) 111 .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> { 112 if (err != null) { 113 future.completeExceptionally(err); 114 return; 115 } 116 future.complete(getRegionLocations(r) 117 .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId()))); 118 }); 119 } catch (IOException parseEx) { 120 LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName)); 121 future.completeExceptionally(parseEx); 122 } 123 return future; 124 } 125 126 /** Returns the HRegionLocation from meta for the given encoded region name */ 127 public static CompletableFuture<Optional<HRegionLocation>> 128 getRegionLocationWithEncodedName(AsyncTable<?> metaTable, byte[] encodedRegionName) { 129 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>(); 130 addListener( 131 metaTable 132 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)), 133 (results, err) -> { 134 if (err != null) { 135 future.completeExceptionally(err); 136 return; 137 } 138 String encodedRegionNameStr = Bytes.toString(encodedRegionName); 139 results.stream().filter(result -> !result.isEmpty()) 140 .filter(result -> CatalogFamilyFormat.getRegionInfo(result) != null).forEach(result -> { 141 getRegionLocations(result).ifPresent(locations -> { 142 for (HRegionLocation location : locations.getRegionLocations()) { 143 if ( 144 location != null 145 && encodedRegionNameStr.equals(location.getRegion().getEncodedName()) 146 ) { 147 future.complete(Optional.of(location)); 148 return; 149 } 150 } 151 }); 152 }); 153 future.complete(Optional.empty()); 154 }); 155 return future; 156 } 157 158 private static Optional<TableState> getTableState(Result r) throws IOException { 159 return Optional.ofNullable(CatalogFamilyFormat.getTableState(r)); 160 } 161 162 /** 163 * Used to get all region locations for the specific table 164 * @param metaTable scanner over meta table 165 * @param tableName table we're looking for, can be null for getting all regions 166 * @return the list of region locations. The return value will be wrapped by a 167 * {@link CompletableFuture}. 168 */ 169 public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations( 170 AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) { 171 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); 172 addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> { 173 if (err != null) { 174 future.completeExceptionally(err); 175 } else if (locations == null || locations.isEmpty()) { 176 future.complete(Collections.emptyList()); 177 } else { 178 List<HRegionLocation> regionLocations = 179 locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) 180 .collect(Collectors.toList()); 181 future.complete(regionLocations); 182 } 183 }); 184 return future; 185 } 186 187 /** 188 * Used to get table regions' info and server. 189 * @param metaTable scanner over meta table 190 * @param tableName table we're looking for, can be null for getting all regions 191 * @param excludeOfflinedSplitParents don't return split parents 192 * @return the list of regioninfos and server. The return value will be wrapped by a 193 * {@link CompletableFuture}. 194 */ 195 private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations( 196 final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName, 197 final boolean excludeOfflinedSplitParents) { 198 CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>(); 199 if (TableName.META_TABLE_NAME.equals(tableName)) { 200 future.completeExceptionally(new IOException( 201 "This method can't be used to locate meta regions;" + " use MetaTableLocator instead")); 202 } 203 204 // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress 205 CollectRegionLocationsVisitor visitor = 206 new CollectRegionLocationsVisitor(excludeOfflinedSplitParents); 207 208 addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> { 209 if (error != null) { 210 future.completeExceptionally(error); 211 return; 212 } 213 future.complete(visitor.getResults()); 214 }); 215 return future; 216 } 217 218 /** 219 * Performs a scan of META table for given table. 220 * @param metaTable scanner over meta table 221 * @param tableName table within we scan 222 * @param type scanned part of meta 223 * @param visitor Visitor invoked against each row 224 */ 225 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, 226 TableName tableName, QueryType type, final Visitor visitor) { 227 return scanMeta(metaTable, getTableStartRowForMeta(tableName, type), 228 getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor); 229 } 230 231 /** 232 * Performs a scan of META table for given table. 233 * @param metaTable scanner over meta table 234 * @param startRow Where to start the scan 235 * @param stopRow Where to stop the scan 236 * @param type scanned part of meta 237 * @param maxRows maximum rows to return 238 * @param visitor Visitor invoked against each row 239 */ 240 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable, 241 byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) { 242 int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; 243 Scan scan = getMetaScan(metaTable, rowUpperLimit); 244 for (byte[] family : type.getFamilies()) { 245 scan.addFamily(family); 246 } 247 if (startRow != null) { 248 scan.withStartRow(startRow); 249 } 250 if (stopRow != null) { 251 scan.withStopRow(stopRow); 252 } 253 254 if (LOG.isDebugEnabled()) { 255 LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow()) 256 + " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max=" 257 + rowUpperLimit + " with caching=" + scan.getCaching()); 258 } 259 260 CompletableFuture<Void> future = new CompletableFuture<Void>(); 261 // Get the region locator's meta replica mode. 262 CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration() 263 .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 264 265 if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 266 addListener(metaTable.getDescriptor(), (desc, error) -> { 267 if (error != null) { 268 LOG.error("Failed to get meta table descriptor, error: ", error); 269 future.completeExceptionally(error); 270 return; 271 } 272 273 int numOfReplicas = desc.getRegionReplication(); 274 if (numOfReplicas > 1) { 275 int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas); 276 277 // When the replicaId is 0, do not set to Consistency.TIMELINE 278 if (replicaId > 0) { 279 scan.setReplicaId(replicaId); 280 scan.setConsistency(Consistency.TIMELINE); 281 } 282 } 283 metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future)); 284 }); 285 } else { 286 if (metaReplicaMode == CatalogReplicaMode.HEDGED_READ) { 287 scan.setConsistency(Consistency.TIMELINE); 288 } 289 metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future)); 290 } 291 292 return future; 293 } 294 295 private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer { 296 297 private int currentRowCount; 298 299 private final int rowUpperLimit; 300 301 private final Visitor visitor; 302 303 private final CompletableFuture<Void> future; 304 305 MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor, 306 CompletableFuture<Void> future) { 307 this.rowUpperLimit = rowUpperLimit; 308 this.visitor = visitor; 309 this.future = future; 310 this.currentRowCount = 0; 311 } 312 313 @Override 314 public void onError(Throwable error) { 315 future.completeExceptionally(error); 316 } 317 318 @Override 319 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION", 320 justification = "https://github.com/findbugsproject/findbugs/issues/79") 321 public void onComplete() { 322 future.complete(null); 323 } 324 325 @Override 326 public void onNext(Result[] results, ScanController controller) { 327 boolean terminateScan = false; 328 for (Result result : results) { 329 try { 330 if (!visitor.visit(result)) { 331 terminateScan = true; 332 break; 333 } 334 } catch (Exception e) { 335 future.completeExceptionally(e); 336 terminateScan = true; 337 break; 338 } 339 if (++currentRowCount >= rowUpperLimit) { 340 terminateScan = true; 341 break; 342 } 343 } 344 if (terminateScan) { 345 controller.terminate(); 346 } 347 } 348 } 349 350 /** 351 * Implementations 'visit' a catalog table row. 352 */ 353 public interface Visitor { 354 /** 355 * Visit the catalog table row. 356 * @param r A row from catalog table 357 * @return True if we are to proceed scanning the table, else false if we are to stop now. 358 */ 359 boolean visit(final Result r) throws IOException; 360 } 361 362 /** 363 * Implementations 'visit' a catalog table row but with close() at the end. 364 */ 365 public interface CloseableVisitor extends Visitor, Closeable { 366 } 367 368 /** 369 * A {@link Visitor} that collects content out of passed {@link Result}. 370 */ 371 private static abstract class CollectingVisitor<T> implements Visitor { 372 final List<T> results = new ArrayList<>(); 373 374 @Override 375 public boolean visit(Result r) throws IOException { 376 if (r != null && !r.isEmpty()) { 377 add(r); 378 } 379 return true; 380 } 381 382 abstract void add(Result r); 383 384 /** Returns Collected results; wait till visits complete to collect all possible results */ 385 List<T> getResults() { 386 return this.results; 387 } 388 } 389 390 static class CollectRegionLocationsVisitor 391 extends CollectingVisitor<Pair<RegionInfo, ServerName>> { 392 393 private final boolean excludeOfflinedSplitParents; 394 395 private RegionLocations current = null; 396 397 CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents) { 398 this.excludeOfflinedSplitParents = excludeOfflinedSplitParents; 399 } 400 401 @Override 402 public boolean visit(Result r) throws IOException { 403 Optional<RegionLocations> currentRegionLocations = getRegionLocations(r); 404 current = currentRegionLocations.orElse(null); 405 if (current == null || current.getRegionLocation().getRegion() == null) { 406 LOG.warn("No serialized RegionInfo in " + r); 407 return true; 408 } 409 RegionInfo hri = current.getRegionLocation().getRegion(); 410 if (excludeOfflinedSplitParents && hri.isSplitParent()) { 411 return true; 412 } 413 // Else call super and add this Result to the collection. 414 return super.visit(r); 415 } 416 417 @Override 418 void add(Result r) { 419 if (current == null) { 420 return; 421 } 422 for (HRegionLocation loc : current.getRegionLocations()) { 423 if (loc != null) { 424 this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc.getServerName())); 425 } 426 } 427 } 428 } 429 430 /** 431 * Collects all returned. 432 */ 433 static class CollectAllVisitor extends CollectingVisitor<Result> { 434 @Override 435 void add(Result r) { 436 this.results.add(r); 437 } 438 } 439 440 private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) { 441 Scan scan = new Scan(); 442 int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING, 443 HConstants.DEFAULT_HBASE_META_SCANNER_CACHING); 444 if ( 445 metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS, 446 HConstants.DEFAULT_USE_META_REPLICAS) 447 ) { 448 scan.setConsistency(Consistency.TIMELINE); 449 } 450 if (rowUpperLimit <= scannerCaching) { 451 scan.setLimit(rowUpperLimit); 452 } 453 int rows = Math.min(rowUpperLimit, scannerCaching); 454 scan.setCaching(rows); 455 return scan; 456 } 457 458 /** Returns an HRegionLocationList extracted from the result. */ 459 private static Optional<RegionLocations> getRegionLocations(Result r) { 460 return Optional.ofNullable(CatalogFamilyFormat.getRegionLocations(r)); 461 } 462 463 /** Returns start row for scanning META according to query type */ 464 public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) { 465 if (tableName == null) { 466 return null; 467 } 468 switch (type) { 469 case REGION: 470 case REPLICATION: { 471 byte[] startRow = new byte[tableName.getName().length + 2]; 472 System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length); 473 startRow[startRow.length - 2] = HConstants.DELIMITER; 474 startRow[startRow.length - 1] = HConstants.DELIMITER; 475 return startRow; 476 } 477 case ALL: 478 case TABLE: 479 default: { 480 return tableName.getName(); 481 } 482 } 483 } 484 485 /** Returns stop row for scanning META according to query type */ 486 public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) { 487 if (tableName == null) { 488 return null; 489 } 490 final byte[] stopRow; 491 switch (type) { 492 case REGION: 493 case REPLICATION: { 494 stopRow = new byte[tableName.getName().length + 3]; 495 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); 496 stopRow[stopRow.length - 3] = ' '; 497 stopRow[stopRow.length - 2] = HConstants.DELIMITER; 498 stopRow[stopRow.length - 1] = HConstants.DELIMITER; 499 break; 500 } 501 case ALL: 502 case TABLE: 503 default: { 504 stopRow = new byte[tableName.getName().length + 1]; 505 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length); 506 stopRow[stopRow.length - 1] = ' '; 507 break; 508 } 509 } 510 return stopRow; 511 } 512}