001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.stream.Collectors;
032import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
033import org.apache.hadoop.hbase.client.AsyncTable;
034import org.apache.hadoop.hbase.client.Consistency;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Scan.ReadType;
040import org.apache.hadoop.hbase.client.TableState;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Pair;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * The (asynchronous) meta table accessor used at client side. Used to read/write region and
049 * assignment information store in <code>hbase:meta</code>.
050 * @since 2.0.0
051 * @see CatalogFamilyFormat
052 */
053@InterfaceAudience.Private
054public final class ClientMetaTableAccessor {
055
056  private static final Logger LOG = LoggerFactory.getLogger(ClientMetaTableAccessor.class);
057
058  private ClientMetaTableAccessor() {
059  }
060
061  @InterfaceAudience.Private
062  @SuppressWarnings("ImmutableEnumChecker")
063  public enum QueryType {
064    ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY),
065    REGION(HConstants.CATALOG_FAMILY),
066    TABLE(HConstants.TABLE_FAMILY),
067    REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY);
068
069    private final byte[][] families;
070
071    QueryType(byte[]... families) {
072      this.families = families;
073    }
074
075    byte[][] getFamilies() {
076      return this.families;
077    }
078  }
079
080  public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable,
081    TableName tableName) {
082    return getTableState(metaTable, tableName).thenApply(Optional::isPresent);
083  }
084
085  public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable,
086    TableName tableName) {
087    CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
088    Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY,
089      HConstants.TABLE_STATE_QUALIFIER);
090    addListener(metaTable.get(get), (result, error) -> {
091      if (error != null) {
092        future.completeExceptionally(error);
093        return;
094      }
095      try {
096        future.complete(getTableState(result));
097      } catch (IOException e) {
098        future.completeExceptionally(e);
099      }
100    });
101    return future;
102  }
103
104  /** Returns the HRegionLocation from meta for the given region */
105  public static CompletableFuture<Optional<HRegionLocation>>
106    getRegionLocation(AsyncTable<?> metaTable, byte[] regionName) {
107    CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
108    try {
109      RegionInfo parsedRegionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
110      addListener(metaTable.get(new Get(CatalogFamilyFormat.getMetaKeyForRegion(parsedRegionInfo))
111        .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
112          if (err != null) {
113            future.completeExceptionally(err);
114            return;
115          }
116          future.complete(getRegionLocations(r)
117            .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
118        });
119    } catch (IOException parseEx) {
120      LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
121      future.completeExceptionally(parseEx);
122    }
123    return future;
124  }
125
126  /** Returns the HRegionLocation from meta for the given encoded region name */
127  public static CompletableFuture<Optional<HRegionLocation>>
128    getRegionLocationWithEncodedName(AsyncTable<?> metaTable, byte[] encodedRegionName) {
129    CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
130    addListener(
131      metaTable
132        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
133      (results, err) -> {
134        if (err != null) {
135          future.completeExceptionally(err);
136          return;
137        }
138        String encodedRegionNameStr = Bytes.toString(encodedRegionName);
139        results.stream().filter(result -> !result.isEmpty())
140          .filter(result -> CatalogFamilyFormat.getRegionInfo(result) != null).forEach(result -> {
141            getRegionLocations(result).ifPresent(locations -> {
142              for (HRegionLocation location : locations.getRegionLocations()) {
143                if (
144                  location != null
145                    && encodedRegionNameStr.equals(location.getRegion().getEncodedName())
146                ) {
147                  future.complete(Optional.of(location));
148                  return;
149                }
150              }
151            });
152          });
153        future.complete(Optional.empty());
154      });
155    return future;
156  }
157
158  private static Optional<TableState> getTableState(Result r) throws IOException {
159    return Optional.ofNullable(CatalogFamilyFormat.getTableState(r));
160  }
161
162  /**
163   * Used to get all region locations for the specific table
164   * @param metaTable scanner over meta table
165   * @param tableName table we're looking for, can be null for getting all regions
166   * @return the list of region locations. The return value will be wrapped by a
167   *         {@link CompletableFuture}.
168   */
169  public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
170    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
171    CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
172    addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
173      if (err != null) {
174        future.completeExceptionally(err);
175      } else if (locations == null || locations.isEmpty()) {
176        future.complete(Collections.emptyList());
177      } else {
178        List<HRegionLocation> regionLocations =
179          locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
180            .collect(Collectors.toList());
181        future.complete(regionLocations);
182      }
183    });
184    return future;
185  }
186
187  /**
188   * Used to get table regions' info and server.
189   * @param metaTable                   scanner over meta table
190   * @param tableName                   table we're looking for, can be null for getting all regions
191   * @param excludeOfflinedSplitParents don't return split parents
192   * @return the list of regioninfos and server. The return value will be wrapped by a
193   *         {@link CompletableFuture}.
194   */
195  private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
196    final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
197    final boolean excludeOfflinedSplitParents) {
198    CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
199    if (TableName.META_TABLE_NAME.equals(tableName)) {
200      future.completeExceptionally(new IOException(
201        "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
202    }
203
204    // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
205    CollectRegionLocationsVisitor visitor =
206      new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);
207
208    addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
209      if (error != null) {
210        future.completeExceptionally(error);
211        return;
212      }
213      future.complete(visitor.getResults());
214    });
215    return future;
216  }
217
218  /**
219   * Performs a scan of META table for given table.
220   * @param metaTable scanner over meta table
221   * @param tableName table within we scan
222   * @param type      scanned part of meta
223   * @param visitor   Visitor invoked against each row
224   */
225  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
226    TableName tableName, QueryType type, final Visitor visitor) {
227    return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
228      getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
229  }
230
231  /**
232   * Performs a scan of META table for given table.
233   * @param metaTable scanner over meta table
234   * @param startRow  Where to start the scan
235   * @param stopRow   Where to stop the scan
236   * @param type      scanned part of meta
237   * @param maxRows   maximum rows to return
238   * @param visitor   Visitor invoked against each row
239   */
240  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
241    byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) {
242    int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
243    Scan scan = getMetaScan(metaTable, rowUpperLimit);
244    for (byte[] family : type.getFamilies()) {
245      scan.addFamily(family);
246    }
247    if (startRow != null) {
248      scan.withStartRow(startRow);
249    }
250    if (stopRow != null) {
251      scan.withStopRow(stopRow);
252    }
253
254    if (LOG.isDebugEnabled()) {
255      LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow())
256        + " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max="
257        + rowUpperLimit + " with caching=" + scan.getCaching());
258    }
259
260    CompletableFuture<Void> future = new CompletableFuture<Void>();
261    // Get the region locator's meta replica mode.
262    CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(metaTable.getConfiguration()
263      .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
264
265    if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
266      addListener(metaTable.getDescriptor(), (desc, error) -> {
267        if (error != null) {
268          LOG.error("Failed to get meta table descriptor, error: ", error);
269          future.completeExceptionally(error);
270          return;
271        }
272
273        int numOfReplicas = desc.getRegionReplication();
274        if (numOfReplicas > 1) {
275          int replicaId = ThreadLocalRandom.current().nextInt(numOfReplicas);
276
277          // When the replicaId is 0, do not set to Consistency.TIMELINE
278          if (replicaId > 0) {
279            scan.setReplicaId(replicaId);
280            scan.setConsistency(Consistency.TIMELINE);
281          }
282        }
283        metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
284      });
285    } else {
286      if (metaReplicaMode == CatalogReplicaMode.HEDGED_READ) {
287        scan.setConsistency(Consistency.TIMELINE);
288      }
289      metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
290    }
291
292    return future;
293  }
294
295  private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer {
296
297    private int currentRowCount;
298
299    private final int rowUpperLimit;
300
301    private final Visitor visitor;
302
303    private final CompletableFuture<Void> future;
304
305    MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor,
306      CompletableFuture<Void> future) {
307      this.rowUpperLimit = rowUpperLimit;
308      this.visitor = visitor;
309      this.future = future;
310      this.currentRowCount = 0;
311    }
312
313    @Override
314    public void onError(Throwable error) {
315      future.completeExceptionally(error);
316    }
317
318    @Override
319    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
320        justification = "https://github.com/findbugsproject/findbugs/issues/79")
321    public void onComplete() {
322      future.complete(null);
323    }
324
325    @Override
326    public void onNext(Result[] results, ScanController controller) {
327      boolean terminateScan = false;
328      for (Result result : results) {
329        try {
330          if (!visitor.visit(result)) {
331            terminateScan = true;
332            break;
333          }
334        } catch (Exception e) {
335          future.completeExceptionally(e);
336          terminateScan = true;
337          break;
338        }
339        if (++currentRowCount >= rowUpperLimit) {
340          terminateScan = true;
341          break;
342        }
343      }
344      if (terminateScan) {
345        controller.terminate();
346      }
347    }
348  }
349
350  /**
351   * Implementations 'visit' a catalog table row.
352   */
353  public interface Visitor {
354    /**
355     * Visit the catalog table row.
356     * @param r A row from catalog table
357     * @return True if we are to proceed scanning the table, else false if we are to stop now.
358     */
359    boolean visit(final Result r) throws IOException;
360  }
361
362  /**
363   * Implementations 'visit' a catalog table row but with close() at the end.
364   */
365  public interface CloseableVisitor extends Visitor, Closeable {
366  }
367
368  /**
369   * A {@link Visitor} that collects content out of passed {@link Result}.
370   */
371  private static abstract class CollectingVisitor<T> implements Visitor {
372    final List<T> results = new ArrayList<>();
373
374    @Override
375    public boolean visit(Result r) throws IOException {
376      if (r != null && !r.isEmpty()) {
377        add(r);
378      }
379      return true;
380    }
381
382    abstract void add(Result r);
383
384    /** Returns Collected results; wait till visits complete to collect all possible results */
385    List<T> getResults() {
386      return this.results;
387    }
388  }
389
390  static class CollectRegionLocationsVisitor
391    extends CollectingVisitor<Pair<RegionInfo, ServerName>> {
392
393    private final boolean excludeOfflinedSplitParents;
394
395    private RegionLocations current = null;
396
397    CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents) {
398      this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
399    }
400
401    @Override
402    public boolean visit(Result r) throws IOException {
403      Optional<RegionLocations> currentRegionLocations = getRegionLocations(r);
404      current = currentRegionLocations.orElse(null);
405      if (current == null || current.getRegionLocation().getRegion() == null) {
406        LOG.warn("No serialized RegionInfo in " + r);
407        return true;
408      }
409      RegionInfo hri = current.getRegionLocation().getRegion();
410      if (excludeOfflinedSplitParents && hri.isSplitParent()) {
411        return true;
412      }
413      // Else call super and add this Result to the collection.
414      return super.visit(r);
415    }
416
417    @Override
418    void add(Result r) {
419      if (current == null) {
420        return;
421      }
422      for (HRegionLocation loc : current.getRegionLocations()) {
423        if (loc != null) {
424          this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc.getServerName()));
425        }
426      }
427    }
428  }
429
430  /**
431   * Collects all returned.
432   */
433  static class CollectAllVisitor extends CollectingVisitor<Result> {
434    @Override
435    void add(Result r) {
436      this.results.add(r);
437    }
438  }
439
440  private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
441    Scan scan = new Scan();
442    int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
443      HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
444    if (
445      metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
446        HConstants.DEFAULT_USE_META_REPLICAS)
447    ) {
448      scan.setConsistency(Consistency.TIMELINE);
449    }
450    if (rowUpperLimit <= scannerCaching) {
451      scan.setLimit(rowUpperLimit);
452    }
453    int rows = Math.min(rowUpperLimit, scannerCaching);
454    scan.setCaching(rows);
455    return scan;
456  }
457
458  /** Returns an HRegionLocationList extracted from the result. */
459  private static Optional<RegionLocations> getRegionLocations(Result r) {
460    return Optional.ofNullable(CatalogFamilyFormat.getRegionLocations(r));
461  }
462
463  /** Returns start row for scanning META according to query type */
464  public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) {
465    if (tableName == null) {
466      return null;
467    }
468    switch (type) {
469      case REGION:
470      case REPLICATION: {
471        byte[] startRow = new byte[tableName.getName().length + 2];
472        System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length);
473        startRow[startRow.length - 2] = HConstants.DELIMITER;
474        startRow[startRow.length - 1] = HConstants.DELIMITER;
475        return startRow;
476      }
477      case ALL:
478      case TABLE:
479      default: {
480        return tableName.getName();
481      }
482    }
483  }
484
485  /** Returns stop row for scanning META according to query type */
486  public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) {
487    if (tableName == null) {
488      return null;
489    }
490    final byte[] stopRow;
491    switch (type) {
492      case REGION:
493      case REPLICATION: {
494        stopRow = new byte[tableName.getName().length + 3];
495        System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
496        stopRow[stopRow.length - 3] = ' ';
497        stopRow[stopRow.length - 2] = HConstants.DELIMITER;
498        stopRow[stopRow.length - 1] = HConstants.DELIMITER;
499        break;
500      }
501      case ALL:
502      case TABLE:
503      default: {
504        stopRow = new byte[tableName.getName().length + 1];
505        System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
506        stopRow[stopRow.length - 1] = ' ';
507        break;
508      }
509    }
510    return stopRow;
511  }
512}