001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.io.ByteArrayOutputStream;
023import java.io.Closeable;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Iterator;
030import java.util.LinkedHashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Objects;
034import java.util.Set;
035import java.util.concurrent.CompletableFuture;
036import java.util.stream.Collectors;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.Cell.Type;
039import org.apache.hadoop.hbase.ClientMetaTableAccessor.QueryType;
040import org.apache.hadoop.hbase.client.AsyncTable;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Consistency;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Mutation;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.RegionLocator;
051import org.apache.hadoop.hbase.client.RegionReplicaUtil;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableState;
057import org.apache.hadoop.hbase.filter.Filter;
058import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
059import org.apache.hadoop.hbase.filter.RowFilter;
060import org.apache.hadoop.hbase.filter.SubstringComparator;
061import org.apache.hadoop.hbase.master.RegionState;
062import org.apache.hadoop.hbase.master.RegionState.State;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.ExceptionUtil;
066import org.apache.hadoop.hbase.util.FutureUtils;
067import org.apache.hadoop.hbase.util.Pair;
068import org.apache.hadoop.hbase.util.PairOfSameType;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
074
075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
080
081/**
082 * Read/write operations on <code>hbase:meta</code> region as well as assignment information stored
083 * to <code>hbase:meta</code>.
084 * <p/>
085 * Some of the methods of this class take ZooKeeperWatcher as a param. The only reason for this is
086 * when this class is used on client-side (e.g. HBaseAdmin), we want to use short-lived connection
087 * (opened before each operation, closed right after), while when used on HM or HRS (like in
088 * AssignmentManager) we want permanent connection.
089 * <p/>
090 * HBASE-10070 adds a replicaId to HRI, meaning more than one HRI can be defined for the same table
091 * range (table, startKey, endKey). For every range, there will be at least one HRI defined which is
092 * called default replica.
093 * <p/>
094 * <h2>Meta layout</h2> For each table there is single row named for the table with a 'table' column
095 * family. The column family currently has one column in it, the 'state' column:
096 *
097 * <pre>
098 * table:state => contains table state
099 * </pre>
100 *
101 * For the catalog family, see the comments of {@link CatalogFamilyFormat} for more details.
102 * <p/>
103 * TODO: Add rep_barrier for serial replication explanation. See SerialReplicationChecker.
104 * <p/>
105 * The actual layout of meta should be encapsulated inside MetaTableAccessor methods, and should not
106 * leak out of it (through Result objects, etc)
107 * @see CatalogFamilyFormat
108 * @see ClientMetaTableAccessor
109 */
110@InterfaceAudience.Private
111public final class MetaTableAccessor {
112
113  private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class);
114  private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META");
115
116  private MetaTableAccessor() {
117  }
118
119  @VisibleForTesting
120  public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
121
122  private static final byte ESCAPE_BYTE = (byte) 0xFF;
123
124  private static final byte SEPARATED_BYTE = 0x00;
125
126  ////////////////////////
127  // Reading operations //
128  ////////////////////////
129
130  /**
131   * Performs a full scan of <code>hbase:meta</code> for regions.
132   * @param connection connection we're using
133   * @param visitor Visitor invoked against each row in regions family.
134   */
135  public static void fullScanRegions(Connection connection,
136    final ClientMetaTableAccessor.Visitor visitor) throws IOException {
137    scanMeta(connection, null, null, QueryType.REGION, visitor);
138  }
139
140  /**
141   * Performs a full scan of <code>hbase:meta</code> for regions.
142   * @param connection connection we're using
143   */
144  public static List<Result> fullScanRegions(Connection connection) throws IOException {
145    return fullScan(connection, QueryType.REGION);
146  }
147
148  /**
149   * Performs a full scan of <code>hbase:meta</code> for tables.
150   * @param connection connection we're using
151   * @param visitor Visitor invoked against each row in tables family.
152   */
153  public static void fullScanTables(Connection connection,
154    final ClientMetaTableAccessor.Visitor visitor) throws IOException {
155    scanMeta(connection, null, null, QueryType.TABLE, visitor);
156  }
157
158  /**
159   * Performs a full scan of <code>hbase:meta</code>.
160   * @param connection connection we're using
161   * @param type scanned part of meta
162   * @return List of {@link Result}
163   */
164  private static List<Result> fullScan(Connection connection, QueryType type) throws IOException {
165    ClientMetaTableAccessor.CollectAllVisitor v = new ClientMetaTableAccessor.CollectAllVisitor();
166    scanMeta(connection, null, null, type, v);
167    return v.getResults();
168  }
169
170  /**
171   * Callers should call close on the returned {@link Table} instance.
172   * @param connection connection we're using to access Meta
173   * @return An {@link Table} for <code>hbase:meta</code>
174   * @throws NullPointerException if {@code connection} is {@code null}
175   */
176  public static Table getMetaHTable(final Connection connection) throws IOException {
177    // We used to pass whole CatalogTracker in here, now we just pass in Connection
178    Objects.requireNonNull(connection, "Connection cannot be null");
179    if (connection.isClosed()) {
180      throw new IOException("connection is closed");
181    }
182    return connection.getTable(TableName.META_TABLE_NAME);
183  }
184
185  /**
186   * Gets the region info and assignment for the specified region.
187   * @param connection connection we're using
188   * @param regionName Region to lookup.
189   * @return Location and RegionInfo for <code>regionName</code>
190   * @deprecated use {@link #getRegionLocation(Connection, byte[])} instead
191   */
192  @Deprecated
193  public static Pair<RegionInfo, ServerName> getRegion(Connection connection, byte[] regionName)
194    throws IOException {
195    HRegionLocation location = getRegionLocation(connection, regionName);
196    return location == null ? null : new Pair<>(location.getRegion(), location.getServerName());
197  }
198
199  /**
200   * Returns the HRegionLocation from meta for the given region
201   * @param connection connection we're using
202   * @param regionName region we're looking for
203   * @return HRegionLocation for the given region
204   */
205  public static HRegionLocation getRegionLocation(Connection connection, byte[] regionName)
206    throws IOException {
207    byte[] row = regionName;
208    RegionInfo parsedInfo = null;
209    try {
210      parsedInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
211      row = CatalogFamilyFormat.getMetaKeyForRegion(parsedInfo);
212    } catch (Exception parseEx) {
213      // Ignore. This is used with tableName passed as regionName.
214    }
215    Get get = new Get(row);
216    get.addFamily(HConstants.CATALOG_FAMILY);
217    Result r;
218    try (Table t = getMetaHTable(connection)) {
219      r = t.get(get);
220    }
221    RegionLocations locations = CatalogFamilyFormat.getRegionLocations(r);
222    return locations == null ? null :
223      locations.getRegionLocation(parsedInfo == null ? 0 : parsedInfo.getReplicaId());
224  }
225
226  /**
227   * Returns the HRegionLocation from meta for the given region
228   * @param connection connection we're using
229   * @param regionInfo region information
230   * @return HRegionLocation for the given region
231   */
232  public static HRegionLocation getRegionLocation(Connection connection, RegionInfo regionInfo)
233    throws IOException {
234    return CatalogFamilyFormat.getRegionLocation(getCatalogFamilyRow(connection, regionInfo),
235      regionInfo, regionInfo.getReplicaId());
236  }
237
238  /**
239   * @return Return the {@link HConstants#CATALOG_FAMILY} row from hbase:meta.
240   */
241  public static Result getCatalogFamilyRow(Connection connection, RegionInfo ri)
242    throws IOException {
243    Get get = new Get(CatalogFamilyFormat.getMetaKeyForRegion(ri));
244    get.addFamily(HConstants.CATALOG_FAMILY);
245    try (Table t = getMetaHTable(connection)) {
246      return t.get(get);
247    }
248  }
249
250  /**
251   * Gets the result in hbase:meta for the specified region.
252   * @param connection connection we're using
253   * @param regionName region we're looking for
254   * @return result of the specified region
255   */
256  public static Result getRegionResult(Connection connection, byte[] regionName)
257    throws IOException {
258    Get get = new Get(regionName);
259    get.addFamily(HConstants.CATALOG_FAMILY);
260    try (Table t = getMetaHTable(connection)) {
261      return t.get(get);
262    }
263  }
264
265  /**
266   * Scans META table for a row whose key contains the specified <B>regionEncodedName</B>, returning
267   * a single related <code>Result</code> instance if any row is found, null otherwise.
268   * @param connection the connection to query META table.
269   * @param regionEncodedName the region encoded name to look for at META.
270   * @return <code>Result</code> instance with the row related info in META, null otherwise.
271   * @throws IOException if any errors occur while querying META.
272   */
273  public static Result scanByRegionEncodedName(Connection connection, String regionEncodedName)
274    throws IOException {
275    RowFilter rowFilter =
276      new RowFilter(CompareOperator.EQUAL, new SubstringComparator(regionEncodedName));
277    Scan scan = getMetaScan(connection, 1);
278    scan.setFilter(rowFilter);
279    ResultScanner resultScanner = getMetaHTable(connection).getScanner(scan);
280    return resultScanner.next();
281  }
282
283  /**
284   * @return Return all regioninfos listed in the 'info:merge*' columns of the
285   *         <code>regionName</code> row.
286   */
287  @Nullable
288  public static List<RegionInfo> getMergeRegions(Connection connection, byte[] regionName)
289    throws IOException {
290    return getMergeRegions(getRegionResult(connection, regionName).rawCells());
291  }
292
293  /**
294   * @return Deserialized values of &lt;qualifier,regioninfo&gt; pairs taken from column values that
295   *         match the regex 'info:merge.*' in array of <code>cells</code>.
296   */
297  @Nullable
298  public static Map<String, RegionInfo> getMergeRegionsWithName(Cell[] cells) {
299    if (cells == null) {
300      return null;
301    }
302    Map<String, RegionInfo> regionsToMerge = null;
303    for (Cell cell : cells) {
304      if (!isMergeQualifierPrefix(cell)) {
305        continue;
306      }
307      // Ok. This cell is that of a info:merge* column.
308      RegionInfo ri = RegionInfo.parseFromOrNull(cell.getValueArray(), cell.getValueOffset(),
309        cell.getValueLength());
310      if (ri != null) {
311        if (regionsToMerge == null) {
312          regionsToMerge = new LinkedHashMap<>();
313        }
314        regionsToMerge.put(Bytes.toString(CellUtil.cloneQualifier(cell)), ri);
315      }
316    }
317    return regionsToMerge;
318  }
319
320  /**
321   * @return Deserialized regioninfo values taken from column values that match the regex
322   *         'info:merge.*' in array of <code>cells</code>.
323   */
324  @Nullable
325  public static List<RegionInfo> getMergeRegions(Cell[] cells) {
326    Map<String, RegionInfo> mergeRegionsWithName = getMergeRegionsWithName(cells);
327    return (mergeRegionsWithName == null) ? null : new ArrayList<>(mergeRegionsWithName.values());
328  }
329
330  /**
331   * @return True if any merge regions present in <code>cells</code>; i.e. the column in
332   *         <code>cell</code> matches the regex 'info:merge.*'.
333   */
334  public static boolean hasMergeRegions(Cell[] cells) {
335    for (Cell cell : cells) {
336      if (!isMergeQualifierPrefix(cell)) {
337        continue;
338      }
339      return true;
340    }
341    return false;
342  }
343
344  /**
345   * @return True if the column in <code>cell</code> matches the regex 'info:merge.*'.
346   */
347  private static boolean isMergeQualifierPrefix(Cell cell) {
348    // Check to see if has family and that qualifier starts with the merge qualifier 'merge'
349    return CellUtil.matchingFamily(cell, HConstants.CATALOG_FAMILY) &&
350      PrivateCellUtil.qualifierStartsWith(cell, HConstants.MERGE_QUALIFIER_PREFIX);
351  }
352
353  /**
354   * Checks if the specified table exists. Looks at the hbase:meta table hosted on the specified
355   * server.
356   * @param connection connection we're using
357   * @param tableName table to check
358   * @return true if the table exists in meta, false if not
359   */
360  public static boolean tableExists(Connection connection, final TableName tableName)
361    throws IOException {
362    // Catalog tables always exist.
363    return tableName.equals(TableName.META_TABLE_NAME) ||
364      getTableState(connection, tableName) != null;
365  }
366
367  /**
368   * Lists all of the regions currently in META.
369   * @param connection to connect with
370   * @param excludeOfflinedSplitParents False if we are to include offlined/splitparents regions,
371   *          true and we'll leave out offlined regions from returned list
372   * @return List of all user-space regions.
373   */
374  @VisibleForTesting
375  public static List<RegionInfo> getAllRegions(Connection connection,
376    boolean excludeOfflinedSplitParents) throws IOException {
377    List<Pair<RegionInfo, ServerName>> result;
378
379    result = getTableRegionsAndLocations(connection, null, excludeOfflinedSplitParents);
380
381    return getListOfRegionInfos(result);
382
383  }
384
385  /**
386   * Gets all of the regions of the specified table. Do not use this method to get meta table
387   * regions, use methods in MetaTableLocator instead.
388   * @param connection connection we're using
389   * @param tableName table we're looking for
390   * @return Ordered list of {@link RegionInfo}.
391   */
392  public static List<RegionInfo> getTableRegions(Connection connection, TableName tableName)
393    throws IOException {
394    return getTableRegions(connection, tableName, false);
395  }
396
397  /**
398   * Gets all of the regions of the specified table. Do not use this method to get meta table
399   * regions, use methods in MetaTableLocator instead.
400   * @param connection connection we're using
401   * @param tableName table we're looking for
402   * @param excludeOfflinedSplitParents If true, do not include offlined split parents in the
403   *          return.
404   * @return Ordered list of {@link RegionInfo}.
405   */
406  public static List<RegionInfo> getTableRegions(Connection connection, TableName tableName,
407    final boolean excludeOfflinedSplitParents) throws IOException {
408    List<Pair<RegionInfo, ServerName>> result =
409      getTableRegionsAndLocations(connection, tableName, excludeOfflinedSplitParents);
410    return getListOfRegionInfos(result);
411  }
412
413  private static List<RegionInfo>
414    getListOfRegionInfos(final List<Pair<RegionInfo, ServerName>> pairs) {
415    if (pairs == null || pairs.isEmpty()) {
416      return Collections.emptyList();
417    }
418    List<RegionInfo> result = new ArrayList<>(pairs.size());
419    for (Pair<RegionInfo, ServerName> pair : pairs) {
420      result.add(pair.getFirst());
421    }
422    return result;
423  }
424
425  /**
426   * This method creates a Scan object that will only scan catalog rows that belong to the specified
427   * table. It doesn't specify any columns. This is a better alternative to just using a start row
428   * and scan until it hits a new table since that requires parsing the HRI to get the table name.
429   * @param tableName bytes of table's name
430   * @return configured Scan object
431   * @deprecated This is internal so please remove it when we get a chance.
432   */
433  @Deprecated
434  public static Scan getScanForTableName(Connection connection, TableName tableName) {
435    // Start key is just the table name with delimiters
436    byte[] startKey = ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION);
437    // Stop key appends the smallest possible char to the table name
438    byte[] stopKey = ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION);
439
440    Scan scan = getMetaScan(connection, -1);
441    scan.withStartRow(startKey);
442    scan.withStopRow(stopKey);
443    return scan;
444  }
445
446  private static Scan getMetaScan(Connection connection, int rowUpperLimit) {
447    Scan scan = new Scan();
448    int scannerCaching = connection.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
449      HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
450    if (connection.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
451      HConstants.DEFAULT_USE_META_REPLICAS)) {
452      scan.setConsistency(Consistency.TIMELINE);
453    }
454    if (rowUpperLimit > 0) {
455      scan.setLimit(rowUpperLimit);
456      scan.setReadType(Scan.ReadType.PREAD);
457    }
458    scan.setCaching(scannerCaching);
459    return scan;
460  }
461
462  /**
463   * Do not use this method to get meta table regions, use methods in MetaTableLocator instead.
464   * @param connection connection we're using
465   * @param tableName table we're looking for
466   * @return Return list of regioninfos and server.
467   */
468  public static List<Pair<RegionInfo, ServerName>>
469    getTableRegionsAndLocations(Connection connection, TableName tableName) throws IOException {
470    return getTableRegionsAndLocations(connection, tableName, true);
471  }
472
473  /**
474   * Do not use this method to get meta table regions, use methods in MetaTableLocator instead.
475   * @param connection connection we're using
476   * @param tableName table to work with, can be null for getting all regions
477   * @param excludeOfflinedSplitParents don't return split parents
478   * @return Return list of regioninfos and server addresses.
479   */
480  // What happens here when 1M regions in hbase:meta? This won't scale?
481  public static List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(
482    Connection connection, @Nullable final TableName tableName,
483    final boolean excludeOfflinedSplitParents) throws IOException {
484    if (tableName != null && tableName.equals(TableName.META_TABLE_NAME)) {
485      throw new IOException(
486        "This method can't be used to locate meta regions;" + " use MetaTableLocator instead");
487    }
488    // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
489    ClientMetaTableAccessor.CollectRegionLocationsVisitor visitor =
490      new ClientMetaTableAccessor.CollectRegionLocationsVisitor(excludeOfflinedSplitParents);
491    scanMeta(connection,
492      ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION),
493      ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION), QueryType.REGION,
494      visitor);
495    return visitor.getResults();
496  }
497
498  public static void fullScanMetaAndPrint(Connection connection) throws IOException {
499    ClientMetaTableAccessor.Visitor v = r -> {
500      if (r == null || r.isEmpty()) {
501        return true;
502      }
503      LOG.info("fullScanMetaAndPrint.Current Meta Row: " + r);
504      TableState state = CatalogFamilyFormat.getTableState(r);
505      if (state != null) {
506        LOG.info("fullScanMetaAndPrint.Table State={}" + state);
507      } else {
508        RegionLocations locations = CatalogFamilyFormat.getRegionLocations(r);
509        if (locations == null) {
510          return true;
511        }
512        for (HRegionLocation loc : locations.getRegionLocations()) {
513          if (loc != null) {
514            LOG.info("fullScanMetaAndPrint.HRI Print={}", loc.getRegion());
515          }
516        }
517      }
518      return true;
519    };
520    scanMeta(connection, null, null, QueryType.ALL, v);
521  }
522
523  public static void scanMetaForTableRegions(Connection connection,
524    ClientMetaTableAccessor.Visitor visitor, TableName tableName) throws IOException {
525    scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor);
526  }
527
528  private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
529    final ClientMetaTableAccessor.Visitor visitor) throws IOException {
530    scanMeta(connection, ClientMetaTableAccessor.getTableStartRowForMeta(table, type),
531      ClientMetaTableAccessor.getTableStopRowForMeta(table, type), type, maxRows, visitor);
532  }
533
534  private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
535    @Nullable final byte[] stopRow, QueryType type, final ClientMetaTableAccessor.Visitor visitor)
536    throws IOException {
537    scanMeta(connection, startRow, stopRow, type, Integer.MAX_VALUE, visitor);
538  }
539
540  /**
541   * Performs a scan of META table for given table starting from given row.
542   * @param connection connection we're using
543   * @param visitor visitor to call
544   * @param tableName table withing we scan
545   * @param row start scan from this row
546   * @param rowLimit max number of rows to return
547   */
548  public static void scanMeta(Connection connection, final ClientMetaTableAccessor.Visitor visitor,
549    final TableName tableName, final byte[] row, final int rowLimit) throws IOException {
550    byte[] startRow = null;
551    byte[] stopRow = null;
552    if (tableName != null) {
553      startRow = ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION);
554      if (row != null) {
555        RegionInfo closestRi = getClosestRegionInfo(connection, tableName, row);
556        startRow =
557          RegionInfo.createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false);
558      }
559      stopRow = ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION);
560    }
561    scanMeta(connection, startRow, stopRow, QueryType.REGION, rowLimit, visitor);
562  }
563
564  /**
565   * Performs a scan of META table.
566   * @param connection connection we're using
567   * @param startRow Where to start the scan. Pass null if want to begin scan at first row.
568   * @param stopRow Where to stop the scan. Pass null if want to scan all rows from the start one
569   * @param type scanned part of meta
570   * @param maxRows maximum rows to return
571   * @param visitor Visitor invoked against each row.
572   */
573  static void scanMeta(Connection connection, @Nullable final byte[] startRow,
574    @Nullable final byte[] stopRow, QueryType type, int maxRows,
575    final ClientMetaTableAccessor.Visitor visitor) throws IOException {
576    scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor);
577  }
578
579  private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
580    @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
581    final ClientMetaTableAccessor.Visitor visitor) throws IOException {
582    int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
583    Scan scan = getMetaScan(connection, rowUpperLimit);
584
585    for (byte[] family : type.getFamilies()) {
586      scan.addFamily(family);
587    }
588    if (startRow != null) {
589      scan.withStartRow(startRow);
590    }
591    if (stopRow != null) {
592      scan.withStopRow(stopRow);
593    }
594    if (filter != null) {
595      scan.setFilter(filter);
596    }
597
598    if (LOG.isTraceEnabled()) {
599      LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow) +
600        " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit +
601        " with caching=" + scan.getCaching());
602    }
603
604    int currentRow = 0;
605    try (Table metaTable = getMetaHTable(connection)) {
606      try (ResultScanner scanner = metaTable.getScanner(scan)) {
607        Result data;
608        while ((data = scanner.next()) != null) {
609          if (data.isEmpty()) {
610            continue;
611          }
612          // Break if visit returns false.
613          if (!visitor.visit(data)) {
614            break;
615          }
616          if (++currentRow >= rowUpperLimit) {
617            break;
618          }
619        }
620      }
621    }
622    if (visitor instanceof Closeable) {
623      try {
624        ((Closeable) visitor).close();
625      } catch (Throwable t) {
626        ExceptionUtil.rethrowIfInterrupt(t);
627        LOG.debug("Got exception in closing the meta scanner visitor", t);
628      }
629    }
630  }
631
632  /**
633   * @return Get closest metatable region row to passed <code>row</code>
634   */
635  @NonNull
636  private static RegionInfo getClosestRegionInfo(Connection connection,
637    @NonNull final TableName tableName, @NonNull final byte[] row) throws IOException {
638    byte[] searchRow = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
639    Scan scan = getMetaScan(connection, 1);
640    scan.setReversed(true);
641    scan.withStartRow(searchRow);
642    try (ResultScanner resultScanner = getMetaHTable(connection).getScanner(scan)) {
643      Result result = resultScanner.next();
644      if (result == null) {
645        throw new TableNotFoundException("Cannot find row in META " + " for table: " + tableName +
646          ", row=" + Bytes.toStringBinary(row));
647      }
648      RegionInfo regionInfo = CatalogFamilyFormat.getRegionInfo(result);
649      if (regionInfo == null) {
650        throw new IOException("RegionInfo was null or empty in Meta for " + tableName + ", row=" +
651          Bytes.toStringBinary(row));
652      }
653      return regionInfo;
654    }
655  }
656
657  /**
658   * Returns the {@link ServerName} from catalog table {@link Result} where the region is
659   * transitioning on. It should be the same as
660   * {@link CatalogFamilyFormat#getServerName(Result,int)} if the server is at OPEN state.
661   * @param r Result to pull the transitioning server name from
662   * @return A ServerName instance or {@link CatalogFamilyFormat#getServerName(Result,int)} if
663   *         necessary fields not found or empty.
664   */
665  @Nullable
666  public static ServerName getTargetServerName(final Result r, final int replicaId) {
667    final Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY,
668      CatalogFamilyFormat.getServerNameColumn(replicaId));
669    if (cell == null || cell.getValueLength() == 0) {
670      RegionLocations locations = CatalogFamilyFormat.getRegionLocations(r);
671      if (locations != null) {
672        HRegionLocation location = locations.getRegionLocation(replicaId);
673        if (location != null) {
674          return location.getServerName();
675        }
676      }
677      return null;
678    }
679    return ServerName.parseServerName(
680      Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
681  }
682
683  /**
684   * Returns the daughter regions by reading the corresponding columns of the catalog table Result.
685   * @param data a Result object from the catalog table scan
686   * @return pair of RegionInfo or PairOfSameType(null, null) if region is not a split parent
687   */
688  public static PairOfSameType<RegionInfo> getDaughterRegions(Result data) {
689    RegionInfo splitA = CatalogFamilyFormat.getRegionInfo(data, HConstants.SPLITA_QUALIFIER);
690    RegionInfo splitB = CatalogFamilyFormat.getRegionInfo(data, HConstants.SPLITB_QUALIFIER);
691    return new PairOfSameType<>(splitA, splitB);
692  }
693
694  /**
695   * Fetch table state for given table from META table
696   * @param conn connection to use
697   * @param tableName table to fetch state for
698   */
699  @Nullable
700  public static TableState getTableState(Connection conn, TableName tableName) throws IOException {
701    if (tableName.equals(TableName.META_TABLE_NAME)) {
702      return new TableState(tableName, TableState.State.ENABLED);
703    }
704    Table metaHTable = getMetaHTable(conn);
705    Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY,
706      HConstants.TABLE_STATE_QUALIFIER);
707    Result result = metaHTable.get(get);
708    return CatalogFamilyFormat.getTableState(result);
709  }
710
711  /**
712   * Fetch table states from META table
713   * @param conn connection to use
714   * @return map {tableName -&gt; state}
715   */
716  public static Map<TableName, TableState> getTableStates(Connection conn) throws IOException {
717    final Map<TableName, TableState> states = new LinkedHashMap<>();
718    ClientMetaTableAccessor.Visitor collector = r -> {
719      TableState state = CatalogFamilyFormat.getTableState(r);
720      if (state != null) {
721        states.put(state.getTableName(), state);
722      }
723      return true;
724    };
725    fullScanTables(conn, collector);
726    return states;
727  }
728
729  /**
730   * Updates state in META Do not use. For internal use only.
731   * @param conn connection to use
732   * @param tableName table to look for
733   */
734  public static void updateTableState(Connection conn, TableName tableName, TableState.State actual)
735    throws IOException {
736    updateTableState(conn, new TableState(tableName, actual));
737  }
738
739  /**
740   * Count regions in <code>hbase:meta</code> for passed table.
741   * @param c Configuration object
742   * @param tableName table name to count regions for
743   * @return Count or regions in table <code>tableName</code>
744   */
745  public static int getRegionCount(final Configuration c, final TableName tableName)
746    throws IOException {
747    try (Connection connection = ConnectionFactory.createConnection(c)) {
748      return getRegionCount(connection, tableName);
749    }
750  }
751
752  /**
753   * Count regions in <code>hbase:meta</code> for passed table.
754   * @param connection Connection object
755   * @param tableName table name to count regions for
756   * @return Count or regions in table <code>tableName</code>
757   */
758  public static int getRegionCount(final Connection connection, final TableName tableName)
759    throws IOException {
760    try (RegionLocator locator = connection.getRegionLocator(tableName)) {
761      List<HRegionLocation> locations = locator.getAllRegionLocations();
762      return locations == null ? 0 : locations.size();
763    }
764  }
765
766  ////////////////////////
767  // Editing operations //
768  ////////////////////////
769  /**
770   * Generates and returns a Put containing the region into for the catalog table
771   */
772  public static Put makePutFromRegionInfo(RegionInfo regionInfo, long ts) throws IOException {
773    return addRegionInfo(new Put(regionInfo.getRegionName(), ts), regionInfo);
774  }
775
776  /**
777   * Generates and returns a Delete containing the region info for the catalog table
778   */
779  private static Delete makeDeleteFromRegionInfo(RegionInfo regionInfo, long ts) {
780    if (regionInfo == null) {
781      throw new IllegalArgumentException("Can't make a delete for null region");
782    }
783    Delete delete = new Delete(regionInfo.getRegionName());
784    delete.addFamily(HConstants.CATALOG_FAMILY, ts);
785    return delete;
786  }
787
788  /**
789   * Adds split daughters to the Put
790   */
791  private static Put addDaughtersToPut(Put put, RegionInfo splitA, RegionInfo splitB)
792    throws IOException {
793    if (splitA != null) {
794      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
795        .setFamily(HConstants.CATALOG_FAMILY).setQualifier(HConstants.SPLITA_QUALIFIER)
796        .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(RegionInfo.toByteArray(splitA))
797        .build());
798    }
799    if (splitB != null) {
800      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
801        .setFamily(HConstants.CATALOG_FAMILY).setQualifier(HConstants.SPLITB_QUALIFIER)
802        .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(RegionInfo.toByteArray(splitB))
803        .build());
804    }
805    return put;
806  }
807
808  /**
809   * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
810   * @param connection connection we're using
811   * @param p Put to add to hbase:meta
812   */
813  private static void putToMetaTable(Connection connection, Put p) throws IOException {
814    try (Table table = getMetaHTable(connection)) {
815      put(table, p);
816    }
817  }
818
819  /**
820   * @param t Table to use
821   * @param p put to make
822   */
823  private static void put(Table t, Put p) throws IOException {
824    debugLogMutation(p);
825    t.put(p);
826  }
827
828  /**
829   * Put the passed <code>ps</code> to the <code>hbase:meta</code> table.
830   * @param connection connection we're using
831   * @param ps Put to add to hbase:meta
832   */
833  public static void putsToMetaTable(final Connection connection, final List<Put> ps)
834    throws IOException {
835    if (ps.isEmpty()) {
836      return;
837    }
838    try (Table t = getMetaHTable(connection)) {
839      debugLogMutations(ps);
840      // the implementation for putting a single Put is much simpler so here we do a check first.
841      if (ps.size() == 1) {
842        t.put(ps.get(0));
843      } else {
844        t.put(ps);
845      }
846    }
847  }
848
849  /**
850   * Delete the passed <code>d</code> from the <code>hbase:meta</code> table.
851   * @param connection connection we're using
852   * @param d Delete to add to hbase:meta
853   */
854  private static void deleteFromMetaTable(final Connection connection, final Delete d)
855    throws IOException {
856    List<Delete> dels = new ArrayList<>(1);
857    dels.add(d);
858    deleteFromMetaTable(connection, dels);
859  }
860
861  /**
862   * Delete the passed <code>deletes</code> from the <code>hbase:meta</code> table.
863   * @param connection connection we're using
864   * @param deletes Deletes to add to hbase:meta This list should support #remove.
865   */
866  private static void deleteFromMetaTable(final Connection connection, final List<Delete> deletes)
867    throws IOException {
868    try (Table t = getMetaHTable(connection)) {
869      debugLogMutations(deletes);
870      t.delete(deletes);
871    }
872  }
873
874  /**
875   * Deletes some replica columns corresponding to replicas for the passed rows
876   * @param metaRows rows in hbase:meta
877   * @param replicaIndexToDeleteFrom the replica ID we would start deleting from
878   * @param numReplicasToRemove how many replicas to remove
879   * @param connection connection we're using to access meta table
880   */
881  public static void removeRegionReplicasFromMeta(Set<byte[]> metaRows,
882    int replicaIndexToDeleteFrom, int numReplicasToRemove, Connection connection)
883    throws IOException {
884    int absoluteIndex = replicaIndexToDeleteFrom + numReplicasToRemove;
885    for (byte[] row : metaRows) {
886      long now = EnvironmentEdgeManager.currentTime();
887      Delete deleteReplicaLocations = new Delete(row);
888      for (int i = replicaIndexToDeleteFrom; i < absoluteIndex; i++) {
889        deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
890          CatalogFamilyFormat.getServerColumn(i), now);
891        deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
892          CatalogFamilyFormat.getSeqNumColumn(i), now);
893        deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
894          CatalogFamilyFormat.getStartCodeColumn(i), now);
895        deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
896          CatalogFamilyFormat.getServerNameColumn(i), now);
897        deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
898          CatalogFamilyFormat.getRegionStateColumn(i), now);
899      }
900
901      deleteFromMetaTable(connection, deleteReplicaLocations);
902    }
903  }
904
905  private static Put addRegionStateToPut(Put put, RegionState.State state) throws IOException {
906    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
907      .setFamily(HConstants.CATALOG_FAMILY).setQualifier(HConstants.STATE_QUALIFIER)
908      .setTimestamp(put.getTimestamp()).setType(Cell.Type.Put).setValue(Bytes.toBytes(state.name()))
909      .build());
910    return put;
911  }
912
913  /**
914   * Update state column in hbase:meta.
915   */
916  public static void updateRegionState(Connection connection, RegionInfo ri,
917    RegionState.State state) throws IOException {
918    Put put = new Put(RegionReplicaUtil.getRegionInfoForDefaultReplica(ri).getRegionName());
919    putsToMetaTable(connection, Collections.singletonList(addRegionStateToPut(put, state)));
920  }
921
922  /**
923   * Adds daughter region infos to hbase:meta row for the specified region. Note that this does not
924   * add its daughter's as different rows, but adds information about the daughters in the same row
925   * as the parent. Use
926   * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)} if
927   * you want to do that.
928   * @param connection connection we're using
929   * @param regionInfo RegionInfo of parent region
930   * @param splitA first split daughter of the parent regionInfo
931   * @param splitB second split daughter of the parent regionInfo
932   * @throws IOException if problem connecting or updating meta
933   */
934  public static void addSplitsToParent(Connection connection, RegionInfo regionInfo,
935    RegionInfo splitA, RegionInfo splitB) throws IOException {
936    try (Table meta = getMetaHTable(connection)) {
937      Put put = makePutFromRegionInfo(regionInfo, EnvironmentEdgeManager.currentTime());
938      addDaughtersToPut(put, splitA, splitB);
939      meta.put(put);
940      debugLogMutation(put);
941      LOG.debug("Added region {}", regionInfo.getRegionNameAsString());
942    }
943  }
944
945  /**
946   * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
947   * does not add its daughter's as different rows, but adds information about the daughters in the
948   * same row as the parent. Use
949   * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)} if
950   * you want to do that.
951   * @param connection connection we're using
952   * @param regionInfo region information
953   * @throws IOException if problem connecting or updating meta
954   */
955  @VisibleForTesting
956  public static void addRegionToMeta(Connection connection, RegionInfo regionInfo)
957    throws IOException {
958    addRegionsToMeta(connection, Collections.singletonList(regionInfo), 1);
959  }
960
961  /**
962   * Adds a hbase:meta row for each of the specified new regions. Initial state for new regions is
963   * CLOSED.
964   * @param connection connection we're using
965   * @param regionInfos region information list
966   * @throws IOException if problem connecting or updating meta
967   */
968  public static void addRegionsToMeta(Connection connection, List<RegionInfo> regionInfos,
969    int regionReplication) throws IOException {
970    addRegionsToMeta(connection, regionInfos, regionReplication,
971      EnvironmentEdgeManager.currentTime());
972  }
973
974  /**
975   * Adds a hbase:meta row for each of the specified new regions. Initial state for new regions is
976   * CLOSED.
977   * @param connection connection we're using
978   * @param regionInfos region information list
979   * @param ts desired timestamp
980   * @throws IOException if problem connecting or updating meta
981   */
982  private static void addRegionsToMeta(Connection connection, List<RegionInfo> regionInfos,
983    int regionReplication, long ts) throws IOException {
984    List<Put> puts = new ArrayList<>();
985    for (RegionInfo regionInfo : regionInfos) {
986      if (RegionReplicaUtil.isDefaultReplica(regionInfo)) {
987        Put put = makePutFromRegionInfo(regionInfo, ts);
988        // New regions are added with initial state of CLOSED.
989        addRegionStateToPut(put, RegionState.State.CLOSED);
990        // Add empty locations for region replicas so that number of replicas can be cached
991        // whenever the primary region is looked up from meta
992        for (int i = 1; i < regionReplication; i++) {
993          addEmptyLocation(put, i);
994        }
995        puts.add(put);
996      }
997    }
998    putsToMetaTable(connection, puts);
999    LOG.info("Added {} regions to meta.", puts.size());
1000  }
1001
1002  @VisibleForTesting
1003  static Put addMergeRegions(Put put, Collection<RegionInfo> mergeRegions) throws IOException {
1004    int limit = 10000; // Arbitrary limit. No room in our formatted 'task0000' below for more.
1005    int max = mergeRegions.size();
1006    if (max > limit) {
1007      // Should never happen!!!!! But just in case.
1008      throw new RuntimeException(
1009        "Can't merge " + max + " regions in one go; " + limit + " is upper-limit.");
1010    }
1011    int counter = 0;
1012    for (RegionInfo ri : mergeRegions) {
1013      String qualifier = String.format(HConstants.MERGE_QUALIFIER_PREFIX_STR + "%04d", counter++);
1014      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
1015        .setFamily(HConstants.CATALOG_FAMILY).setQualifier(Bytes.toBytes(qualifier))
1016        .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(RegionInfo.toByteArray(ri))
1017        .build());
1018    }
1019    return put;
1020  }
1021
1022  /**
1023   * Merge regions into one in an atomic operation. Deletes the merging regions in hbase:meta and
1024   * adds the merged region.
1025   * @param connection connection we're using
1026   * @param mergedRegion the merged region
1027   * @param parentSeqNum Parent regions to merge and their next open sequence id used by serial
1028   *          replication. Set to -1 if not needed by this table.
1029   * @param sn the location of the region
1030   */
1031  public static void mergeRegions(Connection connection, RegionInfo mergedRegion,
1032    Map<RegionInfo, Long> parentSeqNum, ServerName sn, int regionReplication) throws IOException {
1033    long time = HConstants.LATEST_TIMESTAMP;
1034    List<Mutation> mutations = new ArrayList<>();
1035    List<RegionInfo> replicationParents = new ArrayList<>();
1036    for (Map.Entry<RegionInfo, Long> e : parentSeqNum.entrySet()) {
1037      RegionInfo ri = e.getKey();
1038      long seqNum = e.getValue();
1039      // Deletes for merging regions
1040      mutations.add(makeDeleteFromRegionInfo(ri, time));
1041      if (seqNum > 0) {
1042        mutations.add(makePutForReplicationBarrier(ri, seqNum, time));
1043        replicationParents.add(ri);
1044      }
1045    }
1046    // Put for parent
1047    Put putOfMerged = makePutFromRegionInfo(mergedRegion, time);
1048    putOfMerged = addMergeRegions(putOfMerged, parentSeqNum.keySet());
1049    // Set initial state to CLOSED.
1050    // NOTE: If initial state is not set to CLOSED then merged region gets added with the
1051    // default OFFLINE state. If Master gets restarted after this step, start up sequence of
1052    // master tries to assign this offline region. This is followed by re-assignments of the
1053    // merged region from resumed {@link MergeTableRegionsProcedure}
1054    addRegionStateToPut(putOfMerged, RegionState.State.CLOSED);
1055    mutations.add(putOfMerged);
1056    // The merged is a new region, openSeqNum = 1 is fine. ServerName may be null
1057    // if crash after merge happened but before we got to here.. means in-memory
1058    // locations of offlined merged, now-closed, regions is lost. Should be ok. We
1059    // assign the merged region later.
1060    if (sn != null) {
1061      addLocation(putOfMerged, sn, 1, mergedRegion.getReplicaId());
1062    }
1063
1064    // Add empty locations for region replicas of the merged region so that number of replicas
1065    // can be cached whenever the primary region is looked up from meta
1066    for (int i = 1; i < regionReplication; i++) {
1067      addEmptyLocation(putOfMerged, i);
1068    }
1069    // add parent reference for serial replication
1070    if (!replicationParents.isEmpty()) {
1071      addReplicationParent(putOfMerged, replicationParents);
1072    }
1073    byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER);
1074    multiMutate(connection, tableRow, mutations);
1075  }
1076
1077  /**
1078   * Splits the region into two in an atomic operation. Offlines the parent region with the
1079   * information that it is split into two, and also adds the daughter regions. Does not add the
1080   * location information to the daughter regions since they are not open yet.
1081   * @param connection connection we're using
1082   * @param parent the parent region which is split
1083   * @param parentOpenSeqNum the next open sequence id for parent region, used by serial
1084   *          replication. -1 if not necessary.
1085   * @param splitA Split daughter region A
1086   * @param splitB Split daughter region B
1087   * @param sn the location of the region
1088   */
1089  public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum,
1090    RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication) throws IOException {
1091    long time = EnvironmentEdgeManager.currentTime();
1092    // Put for parent
1093    Put putParent = makePutFromRegionInfo(
1094      RegionInfoBuilder.newBuilder(parent).setOffline(true).setSplit(true).build(), time);
1095    addDaughtersToPut(putParent, splitA, splitB);
1096
1097    // Puts for daughters
1098    Put putA = makePutFromRegionInfo(splitA, time);
1099    Put putB = makePutFromRegionInfo(splitB, time);
1100    if (parentOpenSeqNum > 0) {
1101      addReplicationBarrier(putParent, parentOpenSeqNum);
1102      addReplicationParent(putA, Collections.singletonList(parent));
1103      addReplicationParent(putB, Collections.singletonList(parent));
1104    }
1105    // Set initial state to CLOSED
1106    // NOTE: If initial state is not set to CLOSED then daughter regions get added with the
1107    // default OFFLINE state. If Master gets restarted after this step, start up sequence of
1108    // master tries to assign these offline regions. This is followed by re-assignments of the
1109    // daughter regions from resumed {@link SplitTableRegionProcedure}
1110    addRegionStateToPut(putA, RegionState.State.CLOSED);
1111    addRegionStateToPut(putB, RegionState.State.CLOSED);
1112
1113    addSequenceNum(putA, 1, splitA.getReplicaId()); // new regions, openSeqNum = 1 is fine.
1114    addSequenceNum(putB, 1, splitB.getReplicaId());
1115
1116    // Add empty locations for region replicas of daughters so that number of replicas can be
1117    // cached whenever the primary region is looked up from meta
1118    for (int i = 1; i < regionReplication; i++) {
1119      addEmptyLocation(putA, i);
1120      addEmptyLocation(putB, i);
1121    }
1122
1123    byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
1124    multiMutate(connection, tableRow, Arrays.asList(putParent, putA, putB));
1125  }
1126
1127  /**
1128   * Update state of the table in meta.
1129   * @param connection what we use for update
1130   * @param state new state
1131   */
1132  private static void updateTableState(Connection connection, TableState state) throws IOException {
1133    Put put = makePutFromTableState(state, EnvironmentEdgeManager.currentTime());
1134    putToMetaTable(connection, put);
1135    LOG.info("Updated {} in hbase:meta", state);
1136  }
1137
1138  /**
1139   * Construct PUT for given state
1140   * @param state new state
1141   */
1142  public static Put makePutFromTableState(TableState state, long ts) {
1143    Put put = new Put(state.getTableName().getName(), ts);
1144    put.addColumn(HConstants.TABLE_FAMILY, HConstants.TABLE_STATE_QUALIFIER,
1145      state.convert().toByteArray());
1146    return put;
1147  }
1148
1149  /**
1150   * Remove state for table from meta
1151   * @param connection to use for deletion
1152   * @param table to delete state for
1153   */
1154  public static void deleteTableState(Connection connection, TableName table) throws IOException {
1155    long time = EnvironmentEdgeManager.currentTime();
1156    Delete delete = new Delete(table.getName());
1157    delete.addColumns(HConstants.TABLE_FAMILY, HConstants.TABLE_STATE_QUALIFIER, time);
1158    deleteFromMetaTable(connection, delete);
1159    LOG.info("Deleted table " + table + " state from META");
1160  }
1161  /**
1162   * Performs an atomic multi-mutate operation against the given table. Used by the likes of merge
1163   * and split as these want to make atomic mutations across multiple rows.
1164   * @throws IOException even if we encounter a RuntimeException, we'll still wrap it in an IOE.
1165   */
1166  private static void multiMutate(Connection conn, byte[] row, List<Mutation> mutations)
1167    throws IOException {
1168    debugLogMutations(mutations);
1169    MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
1170    for (Mutation mutation : mutations) {
1171      if (mutation instanceof Put) {
1172        builder.addMutationRequest(
1173          ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation));
1174      } else if (mutation instanceof Delete) {
1175        builder.addMutationRequest(
1176          ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation));
1177      } else {
1178        throw new DoNotRetryIOException(
1179          "multi in MetaEditor doesn't support " + mutation.getClass().getName());
1180      }
1181    }
1182    MutateRowsRequest request = builder.build();
1183    AsyncTable<?> table = conn.toAsyncConnection().getTable(TableName.META_TABLE_NAME);
1184    CompletableFuture<MutateRowsResponse> future =
1185      table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
1186        MultiRowMutationService::newStub,
1187        (stub, controller, done) -> stub.mutateRows(controller, request, done), row);
1188    FutureUtils.get(future);
1189  }
1190
1191  /**
1192   * Updates the location of the specified region in hbase:meta to be the specified server hostname
1193   * and startcode.
1194   * <p>
1195   * Uses passed catalog tracker to get a connection to the server hosting hbase:meta and makes
1196   * edits to that region.
1197   * @param connection connection we're using
1198   * @param regionInfo region to update location of
1199   * @param openSeqNum the latest sequence number obtained when the region was open
1200   * @param sn Server name
1201   * @param masterSystemTime wall clock time from master if passed in the open region RPC
1202   */
1203  @VisibleForTesting
1204  public static void updateRegionLocation(Connection connection, RegionInfo regionInfo,
1205    ServerName sn, long openSeqNum, long masterSystemTime) throws IOException {
1206    updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime);
1207  }
1208
1209  /**
1210   * Updates the location of the specified region to be the specified server.
1211   * <p>
1212   * Connects to the specified server which should be hosting the specified catalog region name to
1213   * perform the edit.
1214   * @param connection connection we're using
1215   * @param regionInfo region to update location of
1216   * @param sn Server name
1217   * @param openSeqNum the latest sequence number obtained when the region was open
1218   * @param masterSystemTime wall clock time from master if passed in the open region RPC
1219   * @throws IOException In particular could throw {@link java.net.ConnectException} if the server
1220   *           is down on other end.
1221   */
1222  private static void updateLocation(Connection connection, RegionInfo regionInfo, ServerName sn,
1223    long openSeqNum, long masterSystemTime) throws IOException {
1224    // region replicas are kept in the primary region's row
1225    Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), masterSystemTime);
1226    addRegionInfo(put, regionInfo);
1227    addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
1228    putToMetaTable(connection, put);
1229    LOG.info("Updated row {} with server=", regionInfo.getRegionNameAsString(), sn);
1230  }
1231
1232  /**
1233   * Deletes the specified region from META.
1234   * @param connection connection we're using
1235   * @param regionInfo region to be deleted from META
1236   */
1237  public static void deleteRegionInfo(Connection connection, RegionInfo regionInfo)
1238    throws IOException {
1239    Delete delete = new Delete(regionInfo.getRegionName());
1240    delete.addFamily(HConstants.CATALOG_FAMILY, HConstants.LATEST_TIMESTAMP);
1241    deleteFromMetaTable(connection, delete);
1242    LOG.info("Deleted " + regionInfo.getRegionNameAsString());
1243  }
1244
1245  /**
1246   * Deletes the specified regions from META.
1247   * @param connection connection we're using
1248   * @param regionsInfo list of regions to be deleted from META
1249   */
1250  public static void deleteRegionInfos(Connection connection, List<RegionInfo> regionsInfo)
1251    throws IOException {
1252    deleteRegionInfos(connection, regionsInfo, EnvironmentEdgeManager.currentTime());
1253  }
1254
1255  /**
1256   * Deletes the specified regions from META.
1257   * @param connection connection we're using
1258   * @param regionsInfo list of regions to be deleted from META
1259   */
1260  private static void deleteRegionInfos(Connection connection, List<RegionInfo> regionsInfo,
1261    long ts) throws IOException {
1262    List<Delete> deletes = new ArrayList<>(regionsInfo.size());
1263    for (RegionInfo hri : regionsInfo) {
1264      Delete e = new Delete(hri.getRegionName());
1265      e.addFamily(HConstants.CATALOG_FAMILY, ts);
1266      deletes.add(e);
1267    }
1268    deleteFromMetaTable(connection, deletes);
1269    LOG.info("Deleted {} regions from META", regionsInfo.size());
1270    LOG.debug("Deleted regions: {}", regionsInfo);
1271  }
1272
1273  /**
1274   * Overwrites the specified regions from hbase:meta. Deletes old rows for the given regions and
1275   * adds new ones. Regions added back have state CLOSED.
1276   * @param connection connection we're using
1277   * @param regionInfos list of regions to be added to META
1278   */
1279  public static void overwriteRegions(Connection connection, List<RegionInfo> regionInfos,
1280    int regionReplication) throws IOException {
1281    // use master time for delete marker and the Put
1282    long now = EnvironmentEdgeManager.currentTime();
1283    deleteRegionInfos(connection, regionInfos, now);
1284    // Why sleep? This is the easiest way to ensure that the previous deletes does not
1285    // eclipse the following puts, that might happen in the same ts from the server.
1286    // See HBASE-9906, and HBASE-9879. Once either HBASE-9879, HBASE-8770 is fixed,
1287    // or HBASE-9905 is fixed and meta uses seqIds, we do not need the sleep.
1288    //
1289    // HBASE-13875 uses master timestamp for the mutations. The 20ms sleep is not needed
1290    addRegionsToMeta(connection, regionInfos, regionReplication, now + 1);
1291    LOG.info("Overwritten " + regionInfos.size() + " regions to Meta");
1292    LOG.debug("Overwritten regions: {} ", regionInfos);
1293  }
1294
1295  /**
1296   * Deletes merge qualifiers for the specified merge region.
1297   * @param connection connection we're using
1298   * @param mergeRegion the merged region
1299   */
1300  public static void deleteMergeQualifiers(Connection connection, final RegionInfo mergeRegion)
1301    throws IOException {
1302    Delete delete = new Delete(mergeRegion.getRegionName());
1303    // NOTE: We are doing a new hbase:meta read here.
1304    Cell[] cells = getRegionResult(connection, mergeRegion.getRegionName()).rawCells();
1305    if (cells == null || cells.length == 0) {
1306      return;
1307    }
1308    List<byte[]> qualifiers = new ArrayList<>();
1309    for (Cell cell : cells) {
1310      if (!isMergeQualifierPrefix(cell)) {
1311        continue;
1312      }
1313      byte[] qualifier = CellUtil.cloneQualifier(cell);
1314      qualifiers.add(qualifier);
1315      delete.addColumns(HConstants.CATALOG_FAMILY, qualifier, HConstants.LATEST_TIMESTAMP);
1316    }
1317
1318    // There will be race condition that a GCMultipleMergedRegionsProcedure is scheduled while
1319    // the previous GCMultipleMergedRegionsProcedure is still going on, in this case, the second
1320    // GCMultipleMergedRegionsProcedure could delete the merged region by accident!
1321    if (qualifiers.isEmpty()) {
1322      LOG.info("No merged qualifiers for region " + mergeRegion.getRegionNameAsString() +
1323        " in meta table, they are cleaned up already, Skip.");
1324      return;
1325    }
1326
1327    deleteFromMetaTable(connection, delete);
1328    LOG.info("Deleted merge references in " + mergeRegion.getRegionNameAsString() +
1329      ", deleted qualifiers " +
1330      qualifiers.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")));
1331  }
1332
1333  public static Put addRegionInfo(final Put p, final RegionInfo hri) throws IOException {
1334    p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(p.getRow())
1335      .setFamily(HConstants.CATALOG_FAMILY).setQualifier(HConstants.REGIONINFO_QUALIFIER)
1336      .setTimestamp(p.getTimestamp()).setType(Type.Put)
1337      // Serialize the Default Replica HRI otherwise scan of hbase:meta
1338      // shows an info:regioninfo value with encoded name and region
1339      // name that differs from that of the hbase;meta row.
1340      .setValue(RegionInfo.toByteArray(RegionReplicaUtil.getRegionInfoForDefaultReplica(hri)))
1341      .build());
1342    return p;
1343  }
1344
1345  public static Put addLocation(Put p, ServerName sn, long openSeqNum, int replicaId)
1346    throws IOException {
1347    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
1348    return p
1349      .add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
1350        .setQualifier(CatalogFamilyFormat.getServerColumn(replicaId)).setTimestamp(p.getTimestamp())
1351        .setType(Cell.Type.Put).setValue(Bytes.toBytes(sn.getAddress().toString())).build())
1352      .add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
1353        .setQualifier(CatalogFamilyFormat.getStartCodeColumn(replicaId))
1354        .setTimestamp(p.getTimestamp()).setType(Cell.Type.Put)
1355        .setValue(Bytes.toBytes(sn.getStartcode())).build())
1356      .add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
1357        .setQualifier(CatalogFamilyFormat.getSeqNumColumn(replicaId)).setTimestamp(p.getTimestamp())
1358        .setType(Type.Put).setValue(Bytes.toBytes(openSeqNum)).build());
1359  }
1360
1361  private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) {
1362    for (byte b : regionName) {
1363      if (b == ESCAPE_BYTE) {
1364        out.write(ESCAPE_BYTE);
1365      }
1366      out.write(b);
1367    }
1368  }
1369
1370  @VisibleForTesting
1371  public static byte[] getParentsBytes(List<RegionInfo> parents) {
1372    ByteArrayOutputStream bos = new ByteArrayOutputStream();
1373    Iterator<RegionInfo> iter = parents.iterator();
1374    writeRegionName(bos, iter.next().getRegionName());
1375    while (iter.hasNext()) {
1376      bos.write(ESCAPE_BYTE);
1377      bos.write(SEPARATED_BYTE);
1378      writeRegionName(bos, iter.next().getRegionName());
1379    }
1380    return bos.toByteArray();
1381  }
1382
1383  private static List<byte[]> parseParentsBytes(byte[] bytes) {
1384    List<byte[]> parents = new ArrayList<>();
1385    ByteArrayOutputStream bos = new ByteArrayOutputStream();
1386    for (int i = 0; i < bytes.length; i++) {
1387      if (bytes[i] == ESCAPE_BYTE) {
1388        i++;
1389        if (bytes[i] == SEPARATED_BYTE) {
1390          parents.add(bos.toByteArray());
1391          bos.reset();
1392          continue;
1393        }
1394        // fall through to append the byte
1395      }
1396      bos.write(bytes[i]);
1397    }
1398    if (bos.size() > 0) {
1399      parents.add(bos.toByteArray());
1400    }
1401    return parents;
1402  }
1403
1404  private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
1405    byte[] value = getParentsBytes(parents);
1406    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
1407      .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
1408      .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build());
1409  }
1410
1411  public static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
1412    throws IOException {
1413    Put put = new Put(regionInfo.getRegionName(), ts);
1414    addReplicationBarrier(put, openSeqNum);
1415    return put;
1416  }
1417
1418  /**
1419   * See class comment on SerialReplicationChecker
1420   */
1421  public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException {
1422    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
1423      .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(HConstants.SEQNUM_QUALIFIER)
1424      .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(Bytes.toBytes(openSeqNum))
1425      .build());
1426  }
1427
1428  private static Put addEmptyLocation(Put p, int replicaId) throws IOException {
1429    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
1430    return p
1431      .add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
1432        .setQualifier(CatalogFamilyFormat.getServerColumn(replicaId)).setTimestamp(p.getTimestamp())
1433        .setType(Type.Put).build())
1434      .add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
1435        .setQualifier(CatalogFamilyFormat.getStartCodeColumn(replicaId))
1436        .setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).build())
1437      .add(builder.clear().setRow(p.getRow()).setFamily(HConstants.CATALOG_FAMILY)
1438        .setQualifier(CatalogFamilyFormat.getSeqNumColumn(replicaId)).setTimestamp(p.getTimestamp())
1439        .setType(Cell.Type.Put).build());
1440  }
1441
1442  public static final class ReplicationBarrierResult {
1443    private final long[] barriers;
1444    private final RegionState.State state;
1445    private final List<byte[]> parentRegionNames;
1446
1447    ReplicationBarrierResult(long[] barriers, State state, List<byte[]> parentRegionNames) {
1448      this.barriers = barriers;
1449      this.state = state;
1450      this.parentRegionNames = parentRegionNames;
1451    }
1452
1453    public long[] getBarriers() {
1454      return barriers;
1455    }
1456
1457    public RegionState.State getState() {
1458      return state;
1459    }
1460
1461    public List<byte[]> getParentRegionNames() {
1462      return parentRegionNames;
1463    }
1464
1465    @Override
1466    public String toString() {
1467      return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" +
1468        state + ", parentRegionNames=" +
1469        parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")) +
1470        "]";
1471    }
1472  }
1473
1474  private static long getReplicationBarrier(Cell c) {
1475    return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
1476  }
1477
1478  public static long[] getReplicationBarriers(Result result) {
1479    return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
1480      .stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
1481  }
1482
1483  private static ReplicationBarrierResult getReplicationBarrierResult(Result result) {
1484    long[] barriers = getReplicationBarriers(result);
1485    byte[] stateBytes = result.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER);
1486    RegionState.State state =
1487      stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null;
1488    byte[] parentRegionsBytes =
1489      result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
1490    List<byte[]> parentRegionNames =
1491      parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList();
1492    return new ReplicationBarrierResult(barriers, state, parentRegionNames);
1493  }
1494
1495  public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn,
1496    TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException {
1497    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1498    byte[] metaStopKey =
1499      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
1500    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey)
1501      .addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)
1502      .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true)
1503      .setCaching(10);
1504    try (Table table = getMetaHTable(conn); ResultScanner scanner = table.getScanner(scan)) {
1505      for (Result result;;) {
1506        result = scanner.next();
1507        if (result == null) {
1508          return new ReplicationBarrierResult(new long[0], null, Collections.emptyList());
1509        }
1510        byte[] regionName = result.getRow();
1511        // TODO: we may look up a region which has already been split or merged so we need to check
1512        // whether the encoded name matches. Need to find a way to quit earlier when there is no
1513        // record for the given region, for now it will scan to the end of the table.
1514        if (!Bytes.equals(encodedRegionName,
1515          Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) {
1516          continue;
1517        }
1518        return getReplicationBarrierResult(result);
1519      }
1520    }
1521  }
1522
1523  public static long[] getReplicationBarrier(Connection conn, byte[] regionName)
1524    throws IOException {
1525    try (Table table = getMetaHTable(conn)) {
1526      Result result = table.get(new Get(regionName)
1527        .addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
1528        .readAllVersions());
1529      return getReplicationBarriers(result);
1530    }
1531  }
1532
1533  public static List<Pair<String, Long>> getTableEncodedRegionNameAndLastBarrier(Connection conn,
1534    TableName tableName) throws IOException {
1535    List<Pair<String, Long>> list = new ArrayList<>();
1536    scanMeta(conn,
1537      ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REPLICATION),
1538      ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REPLICATION),
1539      QueryType.REPLICATION, r -> {
1540        byte[] value =
1541          r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER);
1542        if (value == null) {
1543          return true;
1544        }
1545        long lastBarrier = Bytes.toLong(value);
1546        String encodedRegionName = RegionInfo.encodeRegionName(r.getRow());
1547        list.add(Pair.newPair(encodedRegionName, lastBarrier));
1548        return true;
1549      });
1550    return list;
1551  }
1552
1553  public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn,
1554    TableName tableName) throws IOException {
1555    List<String> list = new ArrayList<>();
1556    scanMeta(conn,
1557      ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REPLICATION),
1558      ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REPLICATION),
1559      QueryType.REPLICATION, new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
1560        list.add(RegionInfo.encodeRegionName(r.getRow()));
1561        return true;
1562      });
1563    return list;
1564  }
1565
1566  private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException {
1567    if (!METALOG.isDebugEnabled()) {
1568      return;
1569    }
1570    // Logging each mutation in separate line makes it easier to see diff between them visually
1571    // because of common starting indentation.
1572    for (Mutation mutation : mutations) {
1573      debugLogMutation(mutation);
1574    }
1575  }
1576
1577  private static void debugLogMutation(Mutation p) throws IOException {
1578    METALOG.debug("{} {}", p.getClass().getSimpleName(), p.toJSON());
1579  }
1580
1581  private static Put addSequenceNum(Put p, long openSeqNum, int replicaId) throws IOException {
1582    return p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(p.getRow())
1583      .setFamily(HConstants.CATALOG_FAMILY)
1584      .setQualifier(CatalogFamilyFormat.getSeqNumColumn(replicaId)).setTimestamp(p.getTimestamp())
1585      .setType(Type.Put).setValue(Bytes.toBytes(openSeqNum)).build());
1586  }
1587}