001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.Closeable;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CompletableFuture;
029import java.util.stream.Collectors;
030import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
031import org.apache.hadoop.hbase.client.AsyncTable;
032import org.apache.hadoop.hbase.client.Consistency;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Scan.ReadType;
038import org.apache.hadoop.hbase.client.TableState;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.Pair;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * The (asynchronous) meta table accessor used at client side. Used to read/write region and
047 * assignment information store in <code>hbase:meta</code>.
048 * @since 2.0.0
049 * @see CatalogFamilyFormat
050 */
051@InterfaceAudience.Private
052public final class ClientMetaTableAccessor {
053
054  private static final Logger LOG = LoggerFactory.getLogger(ClientMetaTableAccessor.class);
055
056  private ClientMetaTableAccessor() {
057  }
058
059  @InterfaceAudience.Private
060  public enum QueryType {
061    ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY), REGION(HConstants.CATALOG_FAMILY),
062    TABLE(HConstants.TABLE_FAMILY), REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY);
063
064    private final byte[][] families;
065
066    QueryType(byte[]... families) {
067      this.families = families;
068    }
069
070    byte[][] getFamilies() {
071      return this.families;
072    }
073  }
074
075  public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable,
076    TableName tableName) {
077    return getTableState(metaTable, tableName).thenApply(Optional::isPresent);
078  }
079
080  public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable,
081    TableName tableName) {
082    CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
083    Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY,
084      HConstants.TABLE_STATE_QUALIFIER);
085    addListener(metaTable.get(get), (result, error) -> {
086      if (error != null) {
087        future.completeExceptionally(error);
088        return;
089      }
090      try {
091        future.complete(getTableState(result));
092      } catch (IOException e) {
093        future.completeExceptionally(e);
094      }
095    });
096    return future;
097  }
098
099  /**
100   * Returns the HRegionLocation from meta for the given region
101   * @param metaTable
102   * @param regionName region we're looking for
103   * @return HRegionLocation for the given region
104   */
105  public static CompletableFuture<Optional<HRegionLocation>>
106    getRegionLocation(AsyncTable<?> metaTable, byte[] regionName) {
107    CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
108    try {
109      RegionInfo parsedRegionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
110      addListener(metaTable.get(new Get(CatalogFamilyFormat.getMetaKeyForRegion(parsedRegionInfo))
111        .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
112          if (err != null) {
113            future.completeExceptionally(err);
114            return;
115          }
116          future.complete(getRegionLocations(r)
117            .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
118        });
119    } catch (IOException parseEx) {
120      LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
121      future.completeExceptionally(parseEx);
122    }
123    return future;
124  }
125
126  /**
127   * Returns the HRegionLocation from meta for the given encoded region name
128   * @param metaTable
129   * @param encodedRegionName region we're looking for
130   * @return HRegionLocation for the given region
131   */
132  public static CompletableFuture<Optional<HRegionLocation>>
133    getRegionLocationWithEncodedName(AsyncTable<?> metaTable, byte[] encodedRegionName) {
134    CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
135    addListener(
136      metaTable
137        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
138      (results, err) -> {
139        if (err != null) {
140          future.completeExceptionally(err);
141          return;
142        }
143        String encodedRegionNameStr = Bytes.toString(encodedRegionName);
144        results.stream().filter(result -> !result.isEmpty())
145          .filter(result -> CatalogFamilyFormat.getRegionInfo(result) != null).forEach(result -> {
146            getRegionLocations(result).ifPresent(locations -> {
147              for (HRegionLocation location : locations.getRegionLocations()) {
148                if (location != null &&
149                  encodedRegionNameStr.equals(location.getRegion().getEncodedName())) {
150                  future.complete(Optional.of(location));
151                  return;
152                }
153              }
154            });
155          });
156        future.complete(Optional.empty());
157      });
158    return future;
159  }
160
161  private static Optional<TableState> getTableState(Result r) throws IOException {
162    return Optional.ofNullable(CatalogFamilyFormat.getTableState(r));
163  }
164
165  /**
166   * Used to get all region locations for the specific table.
167   * @param metaTable
168   * @param tableName table we're looking for, can be null for getting all regions
169   * @return the list of region locations. The return value will be wrapped by a
170   *         {@link CompletableFuture}.
171   */
172  public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
173    AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
174    CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
175    addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
176      if (err != null) {
177        future.completeExceptionally(err);
178      } else if (locations == null || locations.isEmpty()) {
179        future.complete(Collections.emptyList());
180      } else {
181        List<HRegionLocation> regionLocations =
182          locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
183            .collect(Collectors.toList());
184        future.complete(regionLocations);
185      }
186    });
187    return future;
188  }
189
190  /**
191   * Used to get table regions' info and server.
192   * @param metaTable
193   * @param tableName table we're looking for, can be null for getting all regions
194   * @param excludeOfflinedSplitParents don't return split parents
195   * @return the list of regioninfos and server. The return value will be wrapped by a
196   *         {@link CompletableFuture}.
197   */
198  private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
199    final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
200    final boolean excludeOfflinedSplitParents) {
201    CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
202    if (TableName.META_TABLE_NAME.equals(tableName)) {
203      future.completeExceptionally(new IOException(
204        "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
205    }
206
207    // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
208    CollectRegionLocationsVisitor visitor =
209      new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);
210
211    addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
212      if (error != null) {
213        future.completeExceptionally(error);
214        return;
215      }
216      future.complete(visitor.getResults());
217    });
218    return future;
219  }
220
221  /**
222   * Performs a scan of META table for given table.
223   * @param metaTable
224   * @param tableName table withing we scan
225   * @param type scanned part of meta
226   * @param visitor Visitor invoked against each row
227   */
228  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
229    TableName tableName, QueryType type, final Visitor visitor) {
230    return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
231      getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
232  }
233
234  /**
235   * Performs a scan of META table for given table.
236   * @param metaTable
237   * @param startRow Where to start the scan
238   * @param stopRow Where to stop the scan
239   * @param type scanned part of meta
240   * @param maxRows maximum rows to return
241   * @param visitor Visitor invoked against each row
242   */
243  private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
244    byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) {
245    int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
246    Scan scan = getMetaScan(metaTable, rowUpperLimit);
247    for (byte[] family : type.getFamilies()) {
248      scan.addFamily(family);
249    }
250    if (startRow != null) {
251      scan.withStartRow(startRow);
252    }
253    if (stopRow != null) {
254      scan.withStopRow(stopRow);
255    }
256
257    if (LOG.isDebugEnabled()) {
258      LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow()) +
259        " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max=" +
260        rowUpperLimit + " with caching=" + scan.getCaching());
261    }
262
263    CompletableFuture<Void> future = new CompletableFuture<Void>();
264    metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
265    return future;
266  }
267
268  private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer {
269
270    private int currentRowCount;
271
272    private final int rowUpperLimit;
273
274    private final Visitor visitor;
275
276    private final CompletableFuture<Void> future;
277
278    MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor,
279      CompletableFuture<Void> future) {
280      this.rowUpperLimit = rowUpperLimit;
281      this.visitor = visitor;
282      this.future = future;
283      this.currentRowCount = 0;
284    }
285
286    @Override
287    public void onError(Throwable error) {
288      future.completeExceptionally(error);
289    }
290
291    @Override
292    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
293      justification = "https://github.com/findbugsproject/findbugs/issues/79")
294    public void onComplete() {
295      future.complete(null);
296    }
297
298    @Override
299    public void onNext(Result[] results, ScanController controller) {
300      boolean terminateScan = false;
301      for (Result result : results) {
302        try {
303          if (!visitor.visit(result)) {
304            terminateScan = true;
305            break;
306          }
307        } catch (Exception e) {
308          future.completeExceptionally(e);
309          terminateScan = true;
310          break;
311        }
312        if (++currentRowCount >= rowUpperLimit) {
313          terminateScan = true;
314          break;
315        }
316      }
317      if (terminateScan) {
318        controller.terminate();
319      }
320    }
321  }
322
323  /**
324   * Implementations 'visit' a catalog table row.
325   */
326  public interface Visitor {
327    /**
328     * Visit the catalog table row.
329     * @param r A row from catalog table
330     * @return True if we are to proceed scanning the table, else false if we are to stop now.
331     */
332    boolean visit(final Result r) throws IOException;
333  }
334
335  /**
336   * Implementations 'visit' a catalog table row but with close() at the end.
337   */
338  public interface CloseableVisitor extends Visitor, Closeable {
339  }
340
341  /**
342   * A {@link Visitor} that collects content out of passed {@link Result}.
343   */
344  private static abstract class CollectingVisitor<T> implements Visitor {
345    final List<T> results = new ArrayList<>();
346
347    @Override
348    public boolean visit(Result r) throws IOException {
349      if (r != null && !r.isEmpty()) {
350        add(r);
351      }
352      return true;
353    }
354
355    abstract void add(Result r);
356
357    /**
358     * @return Collected results; wait till visits complete to collect all possible results
359     */
360    List<T> getResults() {
361      return this.results;
362    }
363  }
364
365  static class CollectRegionLocationsVisitor
366    extends CollectingVisitor<Pair<RegionInfo, ServerName>> {
367
368    private final boolean excludeOfflinedSplitParents;
369
370    private RegionLocations current = null;
371
372    CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents) {
373      this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
374    }
375
376    @Override
377    public boolean visit(Result r) throws IOException {
378      Optional<RegionLocations> currentRegionLocations = getRegionLocations(r);
379      current = currentRegionLocations.orElse(null);
380      if (current == null || current.getRegionLocation().getRegion() == null) {
381        LOG.warn("No serialized RegionInfo in " + r);
382        return true;
383      }
384      RegionInfo hri = current.getRegionLocation().getRegion();
385      if (excludeOfflinedSplitParents && hri.isSplitParent()) {
386        return true;
387      }
388      // Else call super and add this Result to the collection.
389      return super.visit(r);
390    }
391
392    @Override
393    void add(Result r) {
394      if (current == null) {
395        return;
396      }
397      for (HRegionLocation loc : current.getRegionLocations()) {
398        if (loc != null) {
399          this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc.getServerName()));
400        }
401      }
402    }
403  }
404
405  /**
406   * Collects all returned.
407   */
408  static class CollectAllVisitor extends CollectingVisitor<Result> {
409    @Override
410    void add(Result r) {
411      this.results.add(r);
412    }
413  }
414
415  private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
416    Scan scan = new Scan();
417    int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
418      HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
419    if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
420      HConstants.DEFAULT_USE_META_REPLICAS)) {
421      scan.setConsistency(Consistency.TIMELINE);
422    }
423    if (rowUpperLimit <= scannerCaching) {
424      scan.setLimit(rowUpperLimit);
425    }
426    int rows = Math.min(rowUpperLimit, scannerCaching);
427    scan.setCaching(rows);
428    return scan;
429  }
430
431  /**
432   * Returns an HRegionLocationList extracted from the result.
433   * @return an HRegionLocationList containing all locations for the region range or null if we
434   *         can't deserialize the result.
435   */
436  private static Optional<RegionLocations> getRegionLocations(Result r) {
437    return Optional.ofNullable(CatalogFamilyFormat.getRegionLocations(r));
438  }
439
440  /**
441   * @param tableName table we're working with
442   * @return start row for scanning META according to query type
443   */
444  public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) {
445    if (tableName == null) {
446      return null;
447    }
448    switch (type) {
449      case REGION:
450      case REPLICATION: {
451        byte[] startRow = new byte[tableName.getName().length + 2];
452        System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length);
453        startRow[startRow.length - 2] = HConstants.DELIMITER;
454        startRow[startRow.length - 1] = HConstants.DELIMITER;
455        return startRow;
456      }
457      case ALL:
458      case TABLE:
459      default: {
460        return tableName.getName();
461      }
462    }
463  }
464
465  /**
466   * @param tableName table we're working with
467   * @return stop row for scanning META according to query type
468   */
469  public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) {
470    if (tableName == null) {
471      return null;
472    }
473    final byte[] stopRow;
474    switch (type) {
475      case REGION:
476      case REPLICATION: {
477        stopRow = new byte[tableName.getName().length + 3];
478        System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
479        stopRow[stopRow.length - 3] = ' ';
480        stopRow[stopRow.length - 2] = HConstants.DELIMITER;
481        stopRow[stopRow.length - 1] = HConstants.DELIMITER;
482        break;
483      }
484      case ALL:
485      case TABLE:
486      default: {
487        stopRow = new byte[tableName.getName().length + 1];
488        System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
489        stopRow[stopRow.length - 1] = ' ';
490        break;
491      }
492    }
493    return stopRow;
494  }
495}