001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.stream.Collectors;
032import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
033import org.apache.hadoop.hbase.client.AsyncTable;
034import org.apache.hadoop.hbase.client.Consistency;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Scan.ReadType;
040import org.apache.hadoop.hbase.client.TableState;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Pair;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * The (asynchronous) meta table accessor used at client side. Used to read/write region and
049 * assignment information store in <code>hbase:meta</code>.
050 * @since 2.0.0
051 * @see CatalogFamilyFormat
052 */
053@InterfaceAudience.Private
054public final class ClientMetaTableAccessor {
055
056  private static final Logger LOG = LoggerFactory.getLogger(ClientMetaTableAccessor.class);
057
058  private ClientMetaTableAccessor() {
059  }
060
061  @InterfaceAudience.Private
062  @SuppressWarnings("ImmutableEnumChecker")
063  public enum QueryType {
064    ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY),
065    REGION(HConstants.CATALOG_FAMILY),
066    TABLE(HConstants.TABLE_FAMILY),
067    REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY);
068
069    private final byte[][] families;
070
071    QueryType(byte[]... families) {
072      this.families = families;
073    }
074
075    byte[][] getFamilies() {
076      return this.families;
077    }
078  }
079
080  public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable,
081    TableName tableName) {
082    return getTableState(metaTable, tableName).thenApply(Optional::isPresent);
083  }
084
085  public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable,
086    TableName tableName) {
087    CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
088    Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY,
089      HConstants.TABLE_STATE_QUALIFIER);
090    addListener(metaTable.get(get), (result, error) -> {
091      if (error != null) {
092        future.completeExceptionally(error);
093        return;
094      }
095      try {
096        future.complete(getTableState(result));
097      } catch (IOException e) {
098        future.completeExceptionally(e);
099      }
100    });
101    return future;
102  }
103
104  /** Returns the HRegionLocation from meta for the given region */
105  public static CompletableFuture<Optional<HRegionLocation>>
106    getRegionLocation(AsyncTable<?> metaTable, byte[] regionName) {
107    CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
108    try {
109      RegionInfo parsedRegionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
110      addListener(metaTable.get(new Get(CatalogFamilyFormat.getMetaKeyForRegion(parsedRegionInfo))
111        .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
112          if (err != null) {
113            future.completeExceptionally(err);
114            return;
115          }
116          future.complete(getRegionLocations(r)
117            .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
118        });
119    } catch (IOException parseEx) {
120      LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
121      future.completeExceptionally(parseEx);
122    }
123    return future;
124  }
125
126  /** Returns the HRegionLocation from meta for the given encoded region name */
127  public static CompletableFuture<Optional<HRegionLocation>>
128    getRegionLocationWithEncodedName(AsyncTable<?> metaTable, byte[] encodedRegionName) {
129    CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
130    addListener(
131      metaTable
132        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
133      (results, err) -> {
134        if (err != null) {
135          future.completeExceptionally(err);
136          return;
137        }
138        String encodedRegionNameStr = Bytes.toString(encodedRegionName);
139        results.stream().filter(result -> !result.isEmpty())
140          .filter(result -> CatalogFamilyFormat.getRegionInfo(result) != null).forEach(result -> {
141            getRegionLocations(result).ifPresent(locations -> {
142              for (HRegionLocation location : locations.getRegionLocations()) {
143                if (
144                  location != null
145                    && encodedRegionNameStr.equals(location.getRegion().getEncodedName())
146                ) {
147                  future.complete(Optional.of(location));
148                  return;
149                }
150              }
151            });
152          });
153        future.complete(Optional.empty());
154      });
155    return future;
156  }
157
158  private static Optional<TableState> getTableState(Result r) throws IOException {
159    return Optional.ofNullable(CatalogFamilyFormat.getTableState(r));
160  }
161
162  /**
163   * Used to get all region locations for the specific table
164   * @param metaTable scanner over meta table
165   * @param tableName table we're looking for, can be null for getting all regions
166   * @return the list of region locations. The return value will be wrapped by a
167   *         {@link CompletableFuture}.
168   */
169  public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
170    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
171    return toHRegionLocations(getTableRegionsAndLocations(metaTable, tableName, true));
172  }
173
174  /**
175   * Used to get a single-RPC, paginated slice of region locations for the specific table, starting
176   * at the meta row derived from {@code startKey} and capped at {@code rowLimit} regions.
177   * {@code startKey} must be a region start-key boundary (e.g. the end key of the previously
178   * visited region), or {@code null}/empty to start at the first region.
179   * @param metaTable scanner over meta table
180   * @param tableName table we're looking for
181   * @param startKey  region start-key to begin scanning from (inclusive); {@code null} or empty
182   *                  starts from the first region
183   * @param rowLimit  maximum number of meta rows to return; if {@code <= 0}, the underlying scan is
184   *                  unbounded
185   * @return the list of region locations. The return value will be wrapped by a
186   *         {@link CompletableFuture}.
187   */
188  public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
189    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName, byte[] startKey,
190    int rowLimit) {
191    return toHRegionLocations(
192      getTableRegionsAndLocations(metaTable, tableName, true, startKey, rowLimit));
193  }
194
195  private static CompletableFuture<List<HRegionLocation>>
196    toHRegionLocations(CompletableFuture<List<Pair<RegionInfo, ServerName>>> source) {
197    CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
198    addListener(source, (locations, err) -> {
199      if (err != null) {
200        future.completeExceptionally(err);
201      } else if (locations == null || locations.isEmpty()) {
202        future.complete(Collections.emptyList());
203      } else {
204        List<HRegionLocation> regionLocations =
205          locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
206            .collect(Collectors.toList());
207        future.complete(regionLocations);
208      }
209    });
210    return future;
211  }
212
213  /**
214   * Used to get table regions' info and server.
215   * @param metaTable                   scanner over meta table
216   * @param tableName                   table we're looking for, can be null for getting all regions
217   * @param excludeOfflinedSplitParents don't return split parents
218   * @return the list of regioninfos and server. The return value will be wrapped by a
219   *         {@link CompletableFuture}.
220   */
221  private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
222    final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
223    final boolean excludeOfflinedSplitParents) {
224    CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
225    if (TableName.META_TABLE_NAME.equals(tableName)) {
226      future.completeExceptionally(new IOException(
227        "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
228    }
229
230    // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
231    CollectRegionLocationsVisitor visitor =
232      new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);
233
234    addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
235      if (error != null) {
236        future.completeExceptionally(error);
237        return;
238      }
239      future.complete(visitor.getResults());
240    });
241    return future;
242  }
243
244  /**
245   * Variant of {@link #getTableRegionsAndLocations} that scans a bounded slice of meta starting at
246   * the row derived from {@code startKey} and stopping after at most {@code rowLimit} rows.
247   */
248  private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
249    final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
250    final boolean excludeOfflinedSplitParents, final byte[] startKey, final int rowLimit) {
251    CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
252    if (TableName.META_TABLE_NAME.equals(tableName)) {
253      future.completeExceptionally(new IOException(
254        "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
255      return future;
256    }
257
258    CollectRegionLocationsVisitor visitor =
259      new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);
260
261    byte[] metaStart = (startKey == null || startKey.length == 0)
262      ? getTableStartRowForMeta(tableName, QueryType.REGION)
263      : RegionInfo.createRegionName(tableName, startKey, HConstants.ZEROES, false);
264    byte[] metaStop = getTableStopRowForMeta(tableName, QueryType.REGION);
265
266    addListener(scanMeta(metaTable, metaStart, metaStop, QueryType.REGION, rowLimit, true, visitor),
267      (v, error) -> {
268        if (error != null) {
269          future.completeExceptionally(error);
270          return;
271        }
272        future.complete(visitor.getResults());
273      });
274    return future;
275  }
276
277  /**
278   * Performs a scan of META table for given table.
279   * @param metaTable scanner over meta table
280   * @param tableName table within we scan
281   * @param type      scanned part of meta
282   * @param visitor   Visitor invoked against each row
283   */
284  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
285    TableName tableName, QueryType type, final Visitor visitor) {
286    return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
287      getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, false, visitor);
288  }
289
290  /**
291   * Performs a scan of META table for given table.
292   * @param metaTable   scanner over meta table
293   * @param startRow    Where to start the scan
294   * @param stopRow     Where to stop the scan
295   * @param type        scanned part of meta
296   * @param maxRows     maximum rows to return
297   * @param isPagedScan when {@code true}, the scan is sized so the whole slice (up to
298   *                    {@code maxRows}) returns in a single ScannerNext RPC. When {@code false},
299   *                    uses the configured {@code hbase.meta.scanner.caching}.
300   * @param visitor     Visitor invoked against each row
301   */
302  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
303    byte[] startRow, byte[] stopRow, QueryType type, int maxRows, boolean isPagedScan,
304    final Visitor visitor) {
305    int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
306    Scan scan = getMetaScan(metaTable, rowUpperLimit, isPagedScan);
307    for (byte[] family : type.getFamilies()) {
308      scan.addFamily(family);
309    }
310    if (startRow != null) {
311      scan.withStartRow(startRow);
312    }
313    if (stopRow != null) {
314      scan.withStopRow(stopRow);
315    }
316
317    if (LOG.isDebugEnabled()) {
318      LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow())
319        + " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max="
320        + rowUpperLimit + " with caching=" + scan.getCaching());
321    }
322
323    CompletableFuture<Void> future = new CompletableFuture<Void>();
324    // Get the region locator's meta replica mode.
325    CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration()
326      .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
327
328    if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
329      addListener(metaTable.getDescriptor(), (desc, error) -> {
330        if (error != null) {
331          LOG.error("Failed to get meta table descriptor, error: ", error);
332          future.completeExceptionally(error);
333          return;
334        }
335
336        int numOfReplicas = desc.getRegionReplication();
337        if (numOfReplicas > 1) {
338          int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);
339
340          // When the replicaId is 0, do not set to Consistency.TIMELINE
341          if (replicaId > 0) {
342            scan.setReplicaId(replicaId);
343            scan.setConsistency(Consistency.TIMELINE);
344          }
345        }
346        metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
347      });
348    } else {
349      if (metaReplicaMode == CatalogReplicaMode.HEDGED_READ) {
350        scan.setConsistency(Consistency.TIMELINE);
351      }
352      metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
353    }
354
355    return future;
356  }
357
358  private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer {
359
360    private int currentRowCount;
361
362    private final int rowUpperLimit;
363
364    private final Visitor visitor;
365
366    private final CompletableFuture<Void> future;
367
368    MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor,
369      CompletableFuture<Void> future) {
370      this.rowUpperLimit = rowUpperLimit;
371      this.visitor = visitor;
372      this.future = future;
373      this.currentRowCount = 0;
374    }
375
376    @Override
377    public void onError(Throwable error) {
378      future.completeExceptionally(error);
379    }
380
381    @Override
382    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
383        justification = "https://github.com/findbugsproject/findbugs/issues/79")
384    public void onComplete() {
385      future.complete(null);
386    }
387
388    @Override
389    public void onNext(Result[] results, ScanController controller) {
390      boolean terminateScan = false;
391      for (Result result : results) {
392        try {
393          if (!visitor.visit(result)) {
394            terminateScan = true;
395            break;
396          }
397        } catch (Exception e) {
398          future.completeExceptionally(e);
399          terminateScan = true;
400          break;
401        }
402        if (++currentRowCount >= rowUpperLimit) {
403          terminateScan = true;
404          break;
405        }
406      }
407      if (terminateScan) {
408        controller.terminate();
409      }
410    }
411  }
412
413  /**
414   * Implementations 'visit' a catalog table row.
415   */
416  public interface Visitor {
417    /**
418     * Visit the catalog table row.
419     * @param r A row from catalog table
420     * @return True if we are to proceed scanning the table, else false if we are to stop now.
421     */
422    boolean visit(final Result r) throws IOException;
423  }
424
425  /**
426   * Implementations 'visit' a catalog table row but with close() at the end.
427   */
428  public interface CloseableVisitor extends Visitor, Closeable {
429  }
430
431  /**
432   * A {@link Visitor} that collects content out of passed {@link Result}.
433   */
434  private static abstract class CollectingVisitor<T> implements Visitor {
435    final List<T> results = new ArrayList<>();
436
437    @Override
438    public boolean visit(Result r) throws IOException {
439      if (r != null && !r.isEmpty()) {
440        add(r);
441      }
442      return true;
443    }
444
445    abstract void add(Result r);
446
447    /** Returns Collected results; wait till visits complete to collect all possible results */
448    List<T> getResults() {
449      return this.results;
450    }
451  }
452
453  static class CollectRegionLocationsVisitor
454    extends CollectingVisitor<Pair<RegionInfo, ServerName>> {
455
456    private final boolean excludeOfflinedSplitParents;
457
458    private RegionLocations current = null;
459
460    CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents) {
461      this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
462    }
463
464    @Override
465    public boolean visit(Result r) throws IOException {
466      Optional<RegionLocations> currentRegionLocations = getRegionLocations(r);
467      current = currentRegionLocations.orElse(null);
468      if (current == null || current.getRegionLocation().getRegion() == null) {
469        LOG.warn("No serialized RegionInfo in " + r);
470        return true;
471      }
472      RegionInfo hri = current.getRegionLocation().getRegion();
473      if (excludeOfflinedSplitParents && hri.isSplitParent()) {
474        return true;
475      }
476      // Else call super and add this Result to the collection.
477      return super.visit(r);
478    }
479
480    @Override
481    void add(Result r) {
482      if (current == null) {
483        return;
484      }
485      for (HRegionLocation loc : current.getRegionLocations()) {
486        if (loc != null) {
487          this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc.getServerName()));
488        }
489      }
490    }
491  }
492
493  /**
494   * Collects all returned.
495   */
496  static class CollectAllVisitor extends CollectingVisitor<Result> {
497    @Override
498    void add(Result r) {
499      this.results.add(r);
500    }
501  }
502
503  private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit, boolean isPagedScan) {
504    Scan scan = new Scan();
505    int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
506      HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
507    if (
508      metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
509        HConstants.DEFAULT_USE_META_REPLICAS)
510    ) {
511      scan.setConsistency(Consistency.TIMELINE);
512    }
513    if (isPagedScan) {
514      // Caller is doing a bounded paged scan and expects the whole slice back in one ScannerNext
515      // RPC. Size caching to the slice. Trade-off: a single larger response uses more RegionServer
516      // heap, fine for meta rows (small).
517      scan.setLimit(rowUpperLimit);
518      scan.setCaching(rowUpperLimit);
519    } else {
520      if (rowUpperLimit <= scannerCaching) {
521        scan.setLimit(rowUpperLimit);
522      }
523      scan.setCaching(Math.min(rowUpperLimit, scannerCaching));
524    }
525    return scan;
526  }
527
528  /** Returns an HRegionLocationList extracted from the result. */
529  private static Optional<RegionLocations> getRegionLocations(Result r) {
530    return Optional.ofNullable(CatalogFamilyFormat.getRegionLocations(r));
531  }
532
533  /** Returns start row for scanning META according to query type */
534  public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) {
535    if (tableName == null) {
536      return null;
537    }
538    switch (type) {
539      case REGION:
540      case REPLICATION: {
541        byte[] startRow = new byte[tableName.getName().length + 2];
542        System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length);
543        startRow[startRow.length - 2] = HConstants.DELIMITER;
544        startRow[startRow.length - 1] = HConstants.DELIMITER;
545        return startRow;
546      }
547      case ALL:
548      case TABLE:
549      default: {
550        return tableName.getName();
551      }
552    }
553  }
554
555  /** Returns stop row for scanning META according to query type */
556  public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) {
557    if (tableName == null) {
558      return null;
559    }
560    final byte[] stopRow;
561    switch (type) {
562      case REGION:
563      case REPLICATION: {
564        stopRow = new byte[tableName.getName().length + 3];
565        System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
566        stopRow[stopRow.length - 3] = ' ';
567        stopRow[stopRow.length - 2] = HConstants.DELIMITER;
568        stopRow[stopRow.length - 1] = HConstants.DELIMITER;
569        break;
570      }
571      case ALL:
572      case TABLE:
573      default: {
574        stopRow = new byte[tableName.getName().length + 1];
575        System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
576        stopRow[stopRow.length - 1] = ' ';
577        break;
578      }
579    }
580    return stopRow;
581  }
582}