001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot;
021import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
022import static org.apache.hadoop.hbase.backup.BackupInfo.withType;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.nio.charset.StandardCharsets;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Objects;
040import java.util.Set;
041import java.util.TreeMap;
042import java.util.TreeSet;
043import java.util.function.Predicate;
044import java.util.stream.Collectors;
045import java.util.stream.Stream;
046import org.apache.commons.lang3.ArrayUtils;
047import org.apache.commons.lang3.StringUtils;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.CellUtil;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.NamespaceDescriptor;
054import org.apache.hadoop.hbase.NamespaceExistException;
055import org.apache.hadoop.hbase.ServerName;
056import org.apache.hadoop.hbase.TableExistsException;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.TableNotDisabledException;
059import org.apache.hadoop.hbase.backup.BackupInfo;
060import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
061import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
062import org.apache.hadoop.hbase.backup.BackupType;
063import org.apache.hadoop.hbase.client.Admin;
064import org.apache.hadoop.hbase.client.BufferedMutator;
065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
067import org.apache.hadoop.hbase.client.Connection;
068import org.apache.hadoop.hbase.client.Delete;
069import org.apache.hadoop.hbase.client.Get;
070import org.apache.hadoop.hbase.client.Put;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.SnapshotDescription;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
085import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
086import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
087
088import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
090
091/**
092 * This class provides API to access backup system table<br>
093 * Backup system table schema:<br>
094 * <p>
095 * <ul>
096 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
097 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
098 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of
099 * include table; value=empty</li>
100 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
101 * timestamp]</li>
102 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
103 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
104 * name</li>
105 * </ul>
106 * </p>
107 */
108@InterfaceAudience.Private
109public final class BackupSystemTable implements Closeable {
110
111  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
112
113  static class WALItem {
114    String backupId;
115    String walFile;
116    String backupRoot;
117
118    WALItem(String backupId, String walFile, String backupRoot) {
119      this.backupId = backupId;
120      this.walFile = walFile;
121      this.backupRoot = backupRoot;
122    }
123
124    public String getBackupId() {
125      return backupId;
126    }
127
128    public String getWalFile() {
129      return walFile;
130    }
131
132    public String getBackupRoot() {
133      return backupRoot;
134    }
135
136    @Override
137    public String toString() {
138      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
139    }
140  }
141
142  /**
143   * Backup system table (main) name
144   */
145  private TableName tableName;
146
147  /**
148   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
149   * separate table because we have to isolate general backup operations: create, merge etc from
150   * activity of RegionObserver, which controls process of a bulk loading
151   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
152   */
153  private TableName bulkLoadTableName;
154
155  /**
156   * Stores backup sessions (contexts)
157   */
158  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
159  /**
160   * Stores other meta
161   */
162  final static byte[] META_FAMILY = Bytes.toBytes("meta");
163  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
164  /**
165   * Connection to HBase cluster, shared among all instances
166   */
167  private final Connection connection;
168
169  private final static String BACKUP_INFO_PREFIX = "session:";
170  private final static String START_CODE_ROW = "startcode:";
171  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
172  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
173
174  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
175  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
176
177  private final static String INCR_BACKUP_SET = "incrbackupset:";
178  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
179  private final static String RS_LOG_TS_PREFIX = "rslogts:";
180
181  private final static String BULK_LOAD_PREFIX = "bulk:";
182  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
183  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
184  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
185
186  final static byte[] TBL_COL = Bytes.toBytes("tbl");
187  final static byte[] FAM_COL = Bytes.toBytes("fam");
188  final static byte[] PATH_COL = Bytes.toBytes("path");
189
190  private final static String SET_KEY_PREFIX = "backupset:";
191
192  // separator between BULK_LOAD_PREFIX and ordinals
193  private final static String BLK_LD_DELIM = ":";
194  private final static byte[] EMPTY_VALUE = new byte[] {};
195
196  // Safe delimiter in a string
197  private final static String NULL = "\u0000";
198
199  public BackupSystemTable(Connection conn) throws IOException {
200    this.connection = conn;
201    Configuration conf = this.connection.getConfiguration();
202    tableName = BackupSystemTable.getTableName(conf);
203    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
204    checkSystemTable();
205  }
206
207  private void checkSystemTable() throws IOException {
208    try (Admin admin = connection.getAdmin()) {
209      verifyNamespaceExists(admin);
210      Configuration conf = connection.getConfiguration();
211      if (!admin.tableExists(tableName)) {
212        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
213        createSystemTable(admin, backupHTD);
214      }
215      ensureTableEnabled(admin, tableName);
216      if (!admin.tableExists(bulkLoadTableName)) {
217        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
218        createSystemTable(admin, blHTD);
219      }
220      ensureTableEnabled(admin, bulkLoadTableName);
221      waitForSystemTable(admin, tableName);
222      waitForSystemTable(admin, bulkLoadTableName);
223    }
224  }
225
226  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
227    try {
228      admin.createTable(descriptor);
229    } catch (TableExistsException e) {
230      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
231      // so may be subject to race conditions where one caller succeeds in creating the
232      // table and others fail because it now exists
233      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
234    }
235  }
236
237  private void verifyNamespaceExists(Admin admin) throws IOException {
238    String namespaceName = tableName.getNamespaceAsString();
239    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
240    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
241    boolean exists = false;
242    for (NamespaceDescriptor nsd : list) {
243      if (nsd.getName().equals(ns.getName())) {
244        exists = true;
245        break;
246      }
247    }
248    if (!exists) {
249      try {
250        admin.createNamespace(ns);
251      } catch (NamespaceExistException e) {
252        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
253        // so may be subject to race conditions where one caller succeeds in creating the
254        // namespace and others fail because it now exists
255        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
256      }
257    }
258  }
259
260  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
261    // Return fast if the table is available and avoid a log message
262    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
263      return;
264    }
265    long TIMEOUT = 60000;
266    long startTime = EnvironmentEdgeManager.currentTime();
267    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
268      tableName);
269    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
270      try {
271        Thread.sleep(100);
272      } catch (InterruptedException e) {
273        throw (IOException) new InterruptedIOException().initCause(e);
274      }
275      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
276        throw new IOException(
277          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
278      }
279    }
280    LOG.debug("Backup table {} exists and available", tableName);
281  }
282
283  @Override
284  public void close() {
285    // do nothing
286  }
287
288  /**
289   * Updates status (state) of a backup session in backup system table table
290   * @param info backup info
291   * @throws IOException exception
292   */
293  public void updateBackupInfo(BackupInfo info) throws IOException {
294    if (LOG.isTraceEnabled()) {
295      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
296        + " set status=" + info.getState());
297    }
298    try (Table table = connection.getTable(tableName)) {
299      Put put = createPutForBackupInfo(info);
300      table.put(put);
301    }
302  }
303
304  /*
305   * @param backupId the backup Id
306   * @return Map of rows to path of bulk loaded hfile
307   */
308  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
309    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
310    try (Table table = connection.getTable(bulkLoadTableName);
311      ResultScanner scanner = table.getScanner(scan)) {
312      Result res = null;
313      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
314      while ((res = scanner.next()) != null) {
315        res.advance();
316        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
317        for (Cell cell : res.listCells()) {
318          if (
319            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
320              BackupSystemTable.PATH_COL.length) == 0
321          ) {
322            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
323          }
324        }
325      }
326      return map;
327    }
328  }
329
330  /**
331   * Deletes backup status from backup system table table
332   * @param backupId backup id
333   * @throws IOException exception
334   */
335  public void deleteBackupInfo(String backupId) throws IOException {
336    if (LOG.isTraceEnabled()) {
337      LOG.trace("delete backup status in backup system table for " + backupId);
338    }
339    try (Table table = connection.getTable(tableName)) {
340      Delete del = createDeleteForBackupInfo(backupId);
341      table.delete(del);
342    }
343  }
344
345  /**
346   * Registers a bulk load.
347   * @param tableName     table name
348   * @param region        the region receiving hfile
349   * @param cfToHfilePath column family and associated hfiles
350   */
351  public void registerBulkLoad(TableName tableName, byte[] region,
352    Map<byte[], List<Path>> cfToHfilePath) throws IOException {
353    if (LOG.isDebugEnabled()) {
354      LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
355        cfToHfilePath.size());
356    }
357    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
358      List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
359      bufferedMutator.mutate(puts);
360      LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName);
361    }
362  }
363
364  /**
365   * Removes entries from the table that tracks all bulk loaded hfiles.
366   * @param rows the row keys of the entries to be deleted
367   */
368  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
369    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
370      List<Delete> deletes = new ArrayList<>();
371      for (byte[] row : rows) {
372        Delete del = new Delete(row);
373        deletes.add(del);
374        LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row));
375      }
376      bufferedMutator.mutate(deletes);
377      LOG.debug("Deleted {} bulk load entries.", rows.size());
378    }
379  }
380
381  /**
382   * Reads all registered bulk loads.
383   */
384  public List<BulkLoad> readBulkloadRows() throws IOException {
385    Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null);
386    return processBulkLoadRowScan(scan);
387  }
388
389  /**
390   * Reads the registered bulk loads for the given tables.
391   */
392  public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException {
393    List<BulkLoad> result = new ArrayList<>();
394    for (TableName table : tableList) {
395      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
396      result.addAll(processBulkLoadRowScan(scan));
397    }
398    return result;
399  }
400
401  private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException {
402    List<BulkLoad> result = new ArrayList<>();
403    try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
404      ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
405      Result res;
406      while ((res = scanner.next()) != null) {
407        res.advance();
408        TableName table = null;
409        String fam = null;
410        String path = null;
411        String region = null;
412        byte[] row = null;
413        for (Cell cell : res.listCells()) {
414          row = CellUtil.cloneRow(cell);
415          String rowStr = Bytes.toString(row);
416          region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
417          if (
418            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
419              BackupSystemTable.TBL_COL.length) == 0
420          ) {
421            table = TableName.valueOf(CellUtil.cloneValue(cell));
422          } else if (
423            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
424              BackupSystemTable.FAM_COL.length) == 0
425          ) {
426            fam = Bytes.toString(CellUtil.cloneValue(cell));
427          } else if (
428            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
429              BackupSystemTable.PATH_COL.length) == 0
430          ) {
431            path = Bytes.toString(CellUtil.cloneValue(cell));
432          }
433        }
434        result.add(new BulkLoad(table, region, fam, path, row));
435        LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path);
436      }
437    }
438    return result;
439  }
440
441  /**
442   * Reads backup status object (instance of backup info) from backup system table table
443   * @param backupId backup id
444   * @return Current status of backup session or null
445   */
446  public BackupInfo readBackupInfo(String backupId) throws IOException {
447    if (LOG.isTraceEnabled()) {
448      LOG.trace("read backup status from backup system table for: " + backupId);
449    }
450
451    try (Table table = connection.getTable(tableName)) {
452      Get get = createGetForBackupInfo(backupId);
453      Result res = table.get(get);
454      if (res.isEmpty()) {
455        return null;
456      }
457      return resultToBackupInfo(res);
458    }
459  }
460
461  /**
462   * Read the last backup start code (timestamp) of last successful backup. Will return null if
463   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
464   * there is no successful backup completed so far.
465   * @param backupRoot directory path to backup destination
466   * @return the timestamp of last successful backup
467   * @throws IOException exception
468   */
469  public String readBackupStartCode(String backupRoot) throws IOException {
470    LOG.trace("read backup start code from backup system table");
471
472    try (Table table = connection.getTable(tableName)) {
473      Get get = createGetForStartCode(backupRoot);
474      Result res = table.get(get);
475      if (res.isEmpty()) {
476        return null;
477      }
478      Cell cell = res.listCells().get(0);
479      byte[] val = CellUtil.cloneValue(cell);
480      if (val.length == 0) {
481        return null;
482      }
483      return new String(val, StandardCharsets.UTF_8);
484    }
485  }
486
487  /**
488   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
489   * @param startCode  start code
490   * @param backupRoot root directory path to backup
491   * @throws IOException exception
492   */
493  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
494    if (LOG.isTraceEnabled()) {
495      LOG.trace("write backup start code to backup system table " + startCode);
496    }
497    try (Table table = connection.getTable(tableName)) {
498      Put put = createPutForStartCode(startCode.toString(), backupRoot);
499      table.put(put);
500    }
501  }
502
503  /**
504   * Exclusive operations are: create, delete, merge
505   * @throws IOException if a table operation fails or an active backup exclusive operation is
506   *                     already underway
507   */
508  public void startBackupExclusiveOperation() throws IOException {
509    LOG.debug("Start new backup exclusive operation");
510
511    try (Table table = connection.getTable(tableName)) {
512      Put put = createPutForStartBackupSession();
513      // First try to put if row does not exist
514      if (
515        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
516          .ifNotExists().thenPut(put)
517      ) {
518        // Row exists, try to put if value == ACTIVE_SESSION_NO
519        if (
520          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
521            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
522        ) {
523          throw new ExclusiveOperationException();
524        }
525      }
526    }
527  }
528
529  private Put createPutForStartBackupSession() {
530    Put put = new Put(ACTIVE_SESSION_ROW);
531    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
532    return put;
533  }
534
535  public void finishBackupExclusiveOperation() throws IOException {
536    LOG.debug("Finish backup exclusive operation");
537
538    try (Table table = connection.getTable(tableName)) {
539      Put put = createPutForStopBackupSession();
540      if (
541        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
542          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
543      ) {
544        throw new IOException("There is no active backup exclusive operation");
545      }
546    }
547  }
548
549  private Put createPutForStopBackupSession() {
550    Put put = new Put(ACTIVE_SESSION_ROW);
551    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
552    return put;
553  }
554
555  /**
556   * Get the Region Servers log information after the last log roll from backup system table.
557   * @param backupRoot root directory path to backup
558   * @return RS log info
559   * @throws IOException exception
560   */
561  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
562    throws IOException {
563    LOG.trace("read region server last roll log result to backup system table");
564
565    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
566
567    try (Table table = connection.getTable(tableName);
568      ResultScanner scanner = table.getScanner(scan)) {
569      Result res;
570      HashMap<String, Long> rsTimestampMap = new HashMap<>();
571      while ((res = scanner.next()) != null) {
572        res.advance();
573        Cell cell = res.current();
574        byte[] row = CellUtil.cloneRow(cell);
575        String server = getServerNameForReadRegionServerLastLogRollResult(row);
576        byte[] data = CellUtil.cloneValue(cell);
577        rsTimestampMap.put(server, Bytes.toLong(data));
578      }
579      return rsTimestampMap;
580    }
581  }
582
583  /**
584   * Writes Region Server last roll log result (timestamp) to backup system table table
585   * @param server     Region Server name
586   * @param ts         last log timestamp
587   * @param backupRoot root directory path to backup
588   * @throws IOException exception
589   */
590  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
591    throws IOException {
592    LOG.trace("write region server last roll log result to backup system table");
593
594    try (Table table = connection.getTable(tableName)) {
595      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
596      table.put(put);
597    }
598  }
599
600  /**
601   * Retrieve all table names that are part of any known completed backup
602   */
603  public Set<TableName> getTablesIncludedInBackups() throws IOException {
604    // Incremental backups have the same tables as the preceding full backups
605    List<BackupInfo> infos =
606      getBackupHistory(withState(BackupState.COMPLETE), withType(BackupType.FULL));
607    return infos.stream().flatMap(info -> info.getTableNames().stream())
608      .collect(Collectors.toSet());
609  }
610
611  /**
612   * Goes through all backup history corresponding to the provided root folder, and collects all
613   * backup info mentioning each of the provided tables.
614   * @param set        the tables for which to collect the {@code BackupInfo}
615   * @param backupRoot backup destination path to retrieve backup history for
616   * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at
617   *         least one {@code BackupInfo}
618   * @throws IOException if getting the backup history fails
619   */
620  public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
621    String backupRoot) throws IOException {
622    List<BackupInfo> history = getBackupHistory(withRoot(backupRoot));
623    Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>();
624    for (BackupInfo info : history) {
625      List<TableName> tables = info.getTableNames();
626      for (TableName tableName : tables) {
627        if (set.contains(tableName)) {
628          List<BackupInfo> list =
629            tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>());
630          list.add(info);
631        }
632      }
633    }
634    return tableHistoryMap;
635  }
636
637  /**
638   * Get all backup information passing the given filters, ordered by descending backupId. I.e. from
639   * newest to oldest.
640   */
641  public List<BackupInfo> getBackupHistory(BackupInfo.Filter... toInclude) throws IOException {
642    return getBackupHistory(Order.NEW_TO_OLD, Integer.MAX_VALUE, toInclude);
643  }
644
645  /**
646   * Retrieves the first n entries of the sorted, filtered list of backup infos.
647   * @param order desired ordering of the results.
648   * @param n     number of entries to return
649   */
650  public List<BackupInfo> getBackupHistory(Order order, int n, BackupInfo.Filter... toInclude)
651    throws IOException {
652    Preconditions.checkArgument(n >= 0, "n should be >= 0");
653    LOG.trace("get backup infos from backup system table");
654
655    if (n == 0) {
656      return Collections.emptyList();
657    }
658
659    Predicate<BackupInfo> combinedPredicate = Stream.of(toInclude)
660      .map(filter -> (Predicate<BackupInfo>) filter).reduce(Predicate::and).orElse(x -> true);
661
662    Scan scan = createScanForBackupHistory(order);
663    List<BackupInfo> list = new ArrayList<>();
664
665    try (Table table = connection.getTable(tableName);
666      ResultScanner scanner = table.getScanner(scan)) {
667      Result res;
668      while ((res = scanner.next()) != null) {
669        res.advance();
670        BackupInfo context = cellToBackupInfo(res.current());
671        if (combinedPredicate.test(context)) {
672          list.add(context);
673          if (list.size() == n) {
674            break;
675          }
676        }
677      }
678      return list;
679    }
680  }
681
682  /**
683   * Write the current timestamps for each regionserver to backup system table after a successful
684   * full or incremental backup. The saved timestamp is of the last log file that was backed up
685   * already.
686   * @param tables        tables
687   * @param newTimestamps timestamps
688   * @param backupRoot    root directory path to backup
689   * @throws IOException exception
690   */
691  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
692    String backupRoot) throws IOException {
693    if (LOG.isTraceEnabled()) {
694      LOG.trace("write RS log time stamps to backup system table for tables ["
695        + StringUtils.join(tables, ",") + "]");
696    }
697    List<Put> puts = new ArrayList<>();
698    for (TableName table : tables) {
699      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
700      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
701      puts.add(put);
702    }
703    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) {
704      bufferedMutator.mutate(puts);
705    }
706  }
707
708  /**
709   * Read the timestamp for each region server log after the last successful backup. Each table has
710   * its own set of the timestamps. The info is stored for each table as a concatenated string of
711   * rs->timestapmp
712   * @param backupRoot root directory path to backup
713   * @return the timestamp for each region server. key: tableName value:
714   *         RegionServer,PreviousTimeStamp
715   * @throws IOException exception
716   */
717  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
718    throws IOException {
719    if (LOG.isTraceEnabled()) {
720      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
721    }
722
723    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
724
725    Scan scan = createScanForReadLogTimestampMap(backupRoot);
726    try (Table table = connection.getTable(tableName);
727      ResultScanner scanner = table.getScanner(scan)) {
728      Result res;
729      while ((res = scanner.next()) != null) {
730        res.advance();
731        Cell cell = res.current();
732        byte[] row = CellUtil.cloneRow(cell);
733        String tabName = getTableNameForReadLogTimestampMap(row);
734        TableName tn = TableName.valueOf(tabName);
735        byte[] data = CellUtil.cloneValue(cell);
736        if (data == null) {
737          throw new IOException("Data of last backup data from backup system table "
738            + "is empty. Create a backup first.");
739        }
740        if (data != null && data.length > 0) {
741          HashMap<String, Long> lastBackup =
742            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
743          tableTimestampMap.put(tn, lastBackup);
744        }
745      }
746      return tableTimestampMap;
747    }
748  }
749
750  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
751    Map<String, Long> map) {
752    BackupProtos.TableServerTimestamp.Builder tstBuilder =
753      BackupProtos.TableServerTimestamp.newBuilder();
754    tstBuilder
755      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
756
757    for (Entry<String, Long> entry : map.entrySet()) {
758      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
759      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
760      ServerName sn = ServerName.parseServerName(entry.getKey());
761      snBuilder.setHostName(sn.getHostname());
762      snBuilder.setPort(sn.getPort());
763      builder.setServerName(snBuilder.build());
764      builder.setTimestamp(entry.getValue());
765      tstBuilder.addServerTimestamp(builder.build());
766    }
767
768    return tstBuilder.build();
769  }
770
771  private HashMap<String, Long>
772    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
773
774    HashMap<String, Long> map = new HashMap<>();
775    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
776    for (BackupProtos.ServerTimestamp st : list) {
777      ServerName sn =
778        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
779      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
780    }
781    return map;
782  }
783
784  /**
785   * Return the current tables covered by incremental backup.
786   * @param backupRoot root directory path to backup
787   * @return set of tableNames
788   * @throws IOException exception
789   */
790  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
791    LOG.trace("get incremental backup table set from backup system table");
792
793    TreeSet<TableName> set = new TreeSet<>();
794
795    try (Table table = connection.getTable(tableName)) {
796      Get get = createGetForIncrBackupTableSet(backupRoot);
797      Result res = table.get(get);
798      if (res.isEmpty()) {
799        return set;
800      }
801      List<Cell> cells = res.listCells();
802      for (Cell cell : cells) {
803        // qualifier = table name - we use table names as qualifiers
804        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
805      }
806      return set;
807    }
808  }
809
810  /**
811   * Add tables to global incremental backup set
812   * @param tables     set of tables
813   * @param backupRoot root directory path to backup
814   * @throws IOException exception
815   */
816  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
817    throws IOException {
818    if (LOG.isTraceEnabled()) {
819      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
820        + " tables [" + StringUtils.join(tables, " ") + "]");
821    }
822    if (LOG.isDebugEnabled()) {
823      tables.forEach(table -> LOG.debug(Objects.toString(table)));
824    }
825    try (Table table = connection.getTable(tableName)) {
826      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
827      table.put(put);
828    }
829  }
830
831  /**
832   * Deletes incremental backup set for a backup destination
833   * @param backupRoot backup root
834   */
835  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
836    if (LOG.isTraceEnabled()) {
837      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
838    }
839    try (Table table = connection.getTable(tableName)) {
840      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
841      table.delete(delete);
842    }
843  }
844
845  /**
846   * Checks if we have at least one backup session in backup system table This API is used by
847   * BackupLogCleaner
848   * @return true, if at least one session exists in backup system table
849   * @throws IOException exception
850   */
851  public boolean hasBackupSessions() throws IOException {
852    LOG.trace("Has backup sessions from backup system table");
853
854    Scan scan = createScanForBackupHistory(Order.OLD_TO_NEW);
855    scan.setCaching(1);
856    try (Table table = connection.getTable(tableName);
857      ResultScanner scanner = table.getScanner(scan)) {
858      return scanner.next() != null;
859    }
860  }
861
862  /**
863   * BACKUP SETS
864   */
865
866  /**
867   * Get backup set list
868   * @return backup set list
869   * @throws IOException if a table or scanner operation fails
870   */
871  public List<String> listBackupSets() throws IOException {
872    LOG.trace("Backup set list");
873
874    List<String> list = new ArrayList<>();
875    try (Table table = connection.getTable(tableName)) {
876      Scan scan = createScanForBackupSetList();
877      scan.readVersions(1);
878      try (ResultScanner scanner = table.getScanner(scan)) {
879        Result res;
880        while ((res = scanner.next()) != null) {
881          res.advance();
882          list.add(cellKeyToBackupSetName(res.current()));
883        }
884        return list;
885      }
886    }
887  }
888
889  /**
890   * Get backup set description (list of tables)
891   * @param name set's name
892   * @return list of tables in a backup set
893   * @throws IOException if a table operation fails
894   */
895  public List<TableName> describeBackupSet(String name) throws IOException {
896    if (LOG.isTraceEnabled()) {
897      LOG.trace(" Backup set describe: " + name);
898    }
899    try (Table table = connection.getTable(tableName)) {
900      Get get = createGetForBackupSet(name);
901      Result res = table.get(get);
902      if (res.isEmpty()) {
903        return null;
904      }
905      res.advance();
906      String[] tables = cellValueToBackupSet(res.current());
907      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
908        .collect(Collectors.toList());
909    }
910  }
911
912  /**
913   * Add backup set (list of tables)
914   * @param name      set name
915   * @param newTables list of tables, comma-separated
916   * @throws IOException if a table operation fails
917   */
918  public void addToBackupSet(String name, String[] newTables) throws IOException {
919    if (LOG.isTraceEnabled()) {
920      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
921    }
922    String[] union = null;
923    try (Table table = connection.getTable(tableName)) {
924      Get get = createGetForBackupSet(name);
925      Result res = table.get(get);
926      if (res.isEmpty()) {
927        union = newTables;
928      } else {
929        res.advance();
930        String[] tables = cellValueToBackupSet(res.current());
931        union = merge(tables, newTables);
932      }
933      Put put = createPutForBackupSet(name, union);
934      table.put(put);
935    }
936  }
937
938  /**
939   * Remove tables from backup set (list of tables)
940   * @param name     set name
941   * @param toRemove list of tables
942   * @throws IOException if a table operation or deleting the backup set fails
943   */
944  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
945    if (LOG.isTraceEnabled()) {
946      LOG.trace(
947        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
948    }
949    String[] disjoint;
950    String[] tables;
951    try (Table table = connection.getTable(tableName)) {
952      Get get = createGetForBackupSet(name);
953      Result res = table.get(get);
954      if (res.isEmpty()) {
955        LOG.warn("Backup set '" + name + "' not found.");
956        return;
957      } else {
958        res.advance();
959        tables = cellValueToBackupSet(res.current());
960        disjoint = disjoin(tables, toRemove);
961      }
962      if (disjoint.length > 0 && disjoint.length != tables.length) {
963        Put put = createPutForBackupSet(name, disjoint);
964        table.put(put);
965      } else if (disjoint.length == tables.length) {
966        LOG.warn("Backup set '" + name + "' does not contain tables ["
967          + StringUtils.join(toRemove, " ") + "]");
968      } else { // disjoint.length == 0 and tables.length >0
969        // Delete backup set
970        LOG.info("Backup set '" + name + "' is empty. Deleting.");
971        deleteBackupSet(name);
972      }
973    }
974  }
975
976  private String[] merge(String[] existingTables, String[] newTables) {
977    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
978    tables.addAll(Arrays.asList(newTables));
979    return tables.toArray(new String[0]);
980  }
981
982  private String[] disjoin(String[] existingTables, String[] toRemove) {
983    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
984    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
985    return tables.toArray(new String[0]);
986  }
987
988  /**
989   * Delete backup set
990   * @param name set's name
991   * @throws IOException if getting or deleting the table fails
992   */
993  public void deleteBackupSet(String name) throws IOException {
994    if (LOG.isTraceEnabled()) {
995      LOG.trace(" Backup set delete: " + name);
996    }
997    try (Table table = connection.getTable(tableName)) {
998      Delete del = createDeleteForBackupSet(name);
999      table.delete(del);
1000    }
1001  }
1002
1003  /**
1004   * Get backup system table descriptor
1005   * @return table's descriptor
1006   */
1007  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1008    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1009
1010    ColumnFamilyDescriptorBuilder colBuilder =
1011      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1012
1013    colBuilder.setMaxVersions(1);
1014    Configuration config = HBaseConfiguration.create();
1015    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1016      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1017    colBuilder.setTimeToLive(ttl);
1018
1019    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1020    builder.setColumnFamily(colSessionsDesc);
1021
1022    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1023    colBuilder.setTimeToLive(ttl);
1024    builder.setColumnFamily(colBuilder.build());
1025    return builder.build();
1026  }
1027
1028  public static TableName getTableName(Configuration conf) {
1029    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1030      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1031    return TableName.valueOf(name);
1032  }
1033
1034  public static String getTableNameAsString(Configuration conf) {
1035    return getTableName(conf).getNameAsString();
1036  }
1037
1038  public static String getSnapshotName(Configuration conf) {
1039    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1040  }
1041
1042  /**
1043   * Get backup system table descriptor
1044   * @return table's descriptor
1045   */
1046  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1047    TableDescriptorBuilder builder =
1048      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1049
1050    ColumnFamilyDescriptorBuilder colBuilder =
1051      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1052    colBuilder.setMaxVersions(1);
1053    Configuration config = HBaseConfiguration.create();
1054    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1055      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1056    colBuilder.setTimeToLive(ttl);
1057    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1058    builder.setColumnFamily(colSessionsDesc);
1059    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1060    colBuilder.setTimeToLive(ttl);
1061    builder.setColumnFamily(colBuilder.build());
1062    return builder.build();
1063  }
1064
1065  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1066    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1067      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1068    return TableName.valueOf(name);
1069  }
1070
1071  /**
1072   * Creates Put operation for a given backup info object
1073   * @param context backup info
1074   * @return put operation
1075   * @throws IOException exception
1076   */
1077  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1078    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1079    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1080      context.toByteArray());
1081    return put;
1082  }
1083
1084  /**
1085   * Creates Get operation for a given backup id
1086   * @param backupId backup's ID
1087   * @return get operation
1088   * @throws IOException exception
1089   */
1090  private Get createGetForBackupInfo(String backupId) throws IOException {
1091    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1092    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1093    get.readVersions(1);
1094    return get;
1095  }
1096
1097  /**
1098   * Creates Delete operation for a given backup id
1099   * @param backupId backup's ID
1100   * @return delete operation
1101   */
1102  private Delete createDeleteForBackupInfo(String backupId) {
1103    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1104    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1105    return del;
1106  }
1107
1108  /**
1109   * Converts Result to BackupInfo
1110   * @param res HBase result
1111   * @return backup info instance
1112   * @throws IOException exception
1113   */
1114  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1115    res.advance();
1116    Cell cell = res.current();
1117    return cellToBackupInfo(cell);
1118  }
1119
1120  /**
1121   * Creates Get operation to retrieve start code from backup system table
1122   * @return get operation
1123   * @throws IOException exception
1124   */
1125  private Get createGetForStartCode(String rootPath) throws IOException {
1126    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1127    get.addFamily(BackupSystemTable.META_FAMILY);
1128    get.readVersions(1);
1129    return get;
1130  }
1131
1132  /**
1133   * Creates Put operation to store start code to backup system table
1134   * @return put operation
1135   */
1136  private Put createPutForStartCode(String startCode, String rootPath) {
1137    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1138    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1139      Bytes.toBytes(startCode));
1140    return put;
1141  }
1142
1143  /**
1144   * Creates Get to retrieve incremental backup table set from backup system table
1145   * @return get operation
1146   * @throws IOException exception
1147   */
1148  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1149    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1150    get.addFamily(BackupSystemTable.META_FAMILY);
1151    get.readVersions(1);
1152    return get;
1153  }
1154
1155  /**
1156   * Creates Put to store incremental backup table set
1157   * @param tables tables
1158   * @return put operation
1159   */
1160  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1161    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1162    for (TableName table : tables) {
1163      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1164        EMPTY_VALUE);
1165    }
1166    return put;
1167  }
1168
1169  /**
1170   * Creates Delete for incremental backup table set
1171   * @param backupRoot backup root
1172   * @return delete operation
1173   */
1174  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1175    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1176    delete.addFamily(BackupSystemTable.META_FAMILY);
1177    return delete;
1178  }
1179
1180  /**
1181   * Creates Scan operation to load backup history
1182   * @param order order of the scan results
1183   * @return scan operation
1184   */
1185  private Scan createScanForBackupHistory(Order order) {
1186    Scan scan = new Scan();
1187    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1188    if (order == Order.NEW_TO_OLD) {
1189      byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1190      stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1191      scan.setReversed(true);
1192      scan.withStartRow(stopRow, false);
1193      scan.withStopRow(startRow);
1194    } else if (order == Order.OLD_TO_NEW) {
1195      scan.setStartStopRowForPrefixScan(startRow);
1196    } else {
1197      throw new IllegalArgumentException("Unsupported order: " + order);
1198    }
1199    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1200    scan.readVersions(1);
1201    return scan;
1202  }
1203
1204  /**
1205   * Converts cell to backup info instance.
1206   * @param current current cell
1207   * @return backup backup info instance
1208   * @throws IOException exception
1209   */
1210  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1211    byte[] data = CellUtil.cloneValue(current);
1212    return BackupInfo.fromByteArray(data);
1213  }
1214
1215  /**
1216   * Creates Put to write RS last roll log timestamp map
1217   * @param table table
1218   * @param smap  map, containing RS:ts
1219   * @return put operation
1220   */
1221  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1222    String backupRoot) {
1223    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1224    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1225    return put;
1226  }
1227
1228  /**
1229   * Creates Scan to load table-> { RS -> ts} map of maps
1230   * @return scan operation
1231   */
1232  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1233    Scan scan = new Scan();
1234    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1235    scan.addFamily(BackupSystemTable.META_FAMILY);
1236
1237    return scan;
1238  }
1239
1240  /**
1241   * Get table name from rowkey
1242   * @param cloneRow rowkey
1243   * @return table name
1244   */
1245  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1246    String s = Bytes.toString(cloneRow);
1247    int index = s.lastIndexOf(NULL);
1248    return s.substring(index + 1);
1249  }
1250
1251  /**
1252   * Creates Put to store RS last log result
1253   * @param server    server name
1254   * @param timestamp log roll result (timestamp)
1255   * @return put operation
1256   */
1257  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1258    String backupRoot) {
1259    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1260    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1261      Bytes.toBytes(timestamp));
1262    return put;
1263  }
1264
1265  /**
1266   * Creates Scan operation to load last RS log roll results
1267   * @return scan operation
1268   */
1269  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1270    Scan scan = new Scan();
1271    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1272    scan.addFamily(BackupSystemTable.META_FAMILY);
1273    scan.readVersions(1);
1274
1275    return scan;
1276  }
1277
1278  /**
1279   * Get server's name from rowkey
1280   * @param row rowkey
1281   * @return server's name
1282   */
1283  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1284    String s = Bytes.toString(row);
1285    int index = s.lastIndexOf(NULL);
1286    return s.substring(index + 1);
1287  }
1288
1289  /**
1290   * Creates Put's for bulk loads.
1291   */
1292  private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
1293    Map<byte[], List<Path>> columnFamilyToHFilePaths) {
1294    List<Put> puts = new ArrayList<>();
1295    for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
1296      for (Path path : entry.getValue()) {
1297        String file = path.toString();
1298        int lastSlash = file.lastIndexOf("/");
1299        String filename = file.substring(lastSlash + 1);
1300        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1301          Bytes.toString(region), BLK_LD_DELIM, filename));
1302        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1303        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1304        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1305        puts.add(put);
1306        LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
1307      }
1308    }
1309    return puts;
1310  }
1311
1312  public static void snapshot(Connection conn) throws IOException {
1313    try (Admin admin = conn.getAdmin()) {
1314      Configuration conf = conn.getConfiguration();
1315      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1316    }
1317  }
1318
1319  public static void restoreFromSnapshot(Connection conn) throws IOException {
1320    Configuration conf = conn.getConfiguration();
1321    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1322    try (Admin admin = conn.getAdmin()) {
1323      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1324      if (snapshotExists(admin, snapshotName)) {
1325        admin.restoreBackupSystemTable(snapshotName);
1326        LOG.debug("Done restoring backup system table");
1327      } else {
1328        // Snapshot does not exists, i.e completeBackup failed after
1329        // deleting backup system table snapshot
1330        // In this case we log WARN and proceed
1331        LOG.warn(
1332          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1333      }
1334    }
1335  }
1336
1337  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1338    List<SnapshotDescription> list = admin.listSnapshots();
1339    for (SnapshotDescription desc : list) {
1340      if (desc.getName().equals(snapshotName)) {
1341        return true;
1342      }
1343    }
1344    return false;
1345  }
1346
1347  public static boolean snapshotExists(Connection conn) throws IOException {
1348    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1349  }
1350
1351  public static void deleteSnapshot(Connection conn) throws IOException {
1352    Configuration conf = conn.getConfiguration();
1353    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1354    try (Admin admin = conn.getAdmin()) {
1355      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1356      if (snapshotExists(admin, snapshotName)) {
1357        admin.deleteSnapshot(snapshotName);
1358        LOG.debug("Done deleting backup system table snapshot");
1359      } else {
1360        LOG.error("Snapshot " + snapshotName + " does not exists");
1361      }
1362    }
1363  }
1364
1365  private Put createPutForDeleteOperation(String[] backupIdList) {
1366    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1367    Put put = new Put(DELETE_OP_ROW);
1368    put.addColumn(META_FAMILY, FAM_COL, value);
1369    return put;
1370  }
1371
1372  private Delete createDeleteForBackupDeleteOperation() {
1373    Delete delete = new Delete(DELETE_OP_ROW);
1374    delete.addFamily(META_FAMILY);
1375    return delete;
1376  }
1377
1378  private Get createGetForDeleteOperation() {
1379    Get get = new Get(DELETE_OP_ROW);
1380    get.addFamily(META_FAMILY);
1381    return get;
1382  }
1383
1384  public void startDeleteOperation(String[] backupIdList) throws IOException {
1385    if (LOG.isTraceEnabled()) {
1386      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1387    }
1388    Put put = createPutForDeleteOperation(backupIdList);
1389    try (Table table = connection.getTable(tableName)) {
1390      table.put(put);
1391    }
1392  }
1393
1394  public void finishDeleteOperation() throws IOException {
1395    LOG.trace("Finsih delete operation for backup ids");
1396
1397    Delete delete = createDeleteForBackupDeleteOperation();
1398    try (Table table = connection.getTable(tableName)) {
1399      table.delete(delete);
1400    }
1401  }
1402
1403  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1404    LOG.trace("Get delete operation for backup ids");
1405
1406    Get get = createGetForDeleteOperation();
1407    try (Table table = connection.getTable(tableName)) {
1408      Result res = table.get(get);
1409      if (res.isEmpty()) {
1410        return null;
1411      }
1412      Cell cell = res.listCells().get(0);
1413      byte[] val = CellUtil.cloneValue(cell);
1414      if (val.length == 0) {
1415        return null;
1416      }
1417      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1418        .toArray(String[]::new);
1419    }
1420  }
1421
1422  private Put createPutForMergeOperation(String[] backupIdList) {
1423    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1424    Put put = new Put(MERGE_OP_ROW);
1425    put.addColumn(META_FAMILY, FAM_COL, value);
1426    return put;
1427  }
1428
1429  public boolean isMergeInProgress() throws IOException {
1430    Get get = new Get(MERGE_OP_ROW);
1431    try (Table table = connection.getTable(tableName)) {
1432      Result res = table.get(get);
1433      return !res.isEmpty();
1434    }
1435  }
1436
1437  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1438    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1439    Put put = new Put(MERGE_OP_ROW);
1440    put.addColumn(META_FAMILY, PATH_COL, value);
1441    return put;
1442  }
1443
1444  private Delete createDeleteForBackupMergeOperation() {
1445    Delete delete = new Delete(MERGE_OP_ROW);
1446    delete.addFamily(META_FAMILY);
1447    return delete;
1448  }
1449
1450  private Get createGetForMergeOperation() {
1451    Get get = new Get(MERGE_OP_ROW);
1452    get.addFamily(META_FAMILY);
1453    return get;
1454  }
1455
1456  public void startMergeOperation(String[] backupIdList) throws IOException {
1457    if (LOG.isTraceEnabled()) {
1458      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1459    }
1460    Put put = createPutForMergeOperation(backupIdList);
1461    try (Table table = connection.getTable(tableName)) {
1462      table.put(put);
1463    }
1464  }
1465
1466  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1467    if (LOG.isTraceEnabled()) {
1468      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1469    }
1470    Put put = createPutForUpdateTablesForMerge(tables);
1471    try (Table table = connection.getTable(tableName)) {
1472      table.put(put);
1473    }
1474  }
1475
1476  public void finishMergeOperation() throws IOException {
1477    LOG.trace("Finish merge operation for backup ids");
1478
1479    Delete delete = createDeleteForBackupMergeOperation();
1480    try (Table table = connection.getTable(tableName)) {
1481      table.delete(delete);
1482    }
1483  }
1484
1485  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1486    LOG.trace("Get backup ids for merge operation");
1487
1488    Get get = createGetForMergeOperation();
1489    try (Table table = connection.getTable(tableName)) {
1490      Result res = table.get(get);
1491      if (res.isEmpty()) {
1492        return null;
1493      }
1494      Cell cell = res.listCells().get(0);
1495      byte[] val = CellUtil.cloneValue(cell);
1496      if (val.length == 0) {
1497        return null;
1498      }
1499      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1500        .toArray(String[]::new);
1501    }
1502  }
1503
1504  /**
1505   * Creates a scan to read all registered bulk loads for the given table, or for all tables if
1506   * {@code table} is {@code null}.
1507   */
1508  static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) {
1509    Scan scan = new Scan();
1510    byte[] startRow = table == null
1511      ? BULK_LOAD_PREFIX_BYTES
1512      : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1513    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1514    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1515    scan.withStartRow(startRow);
1516    scan.withStopRow(stopRow);
1517    scan.addFamily(BackupSystemTable.META_FAMILY);
1518    scan.readVersions(1);
1519    return scan;
1520  }
1521
1522  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1523    // format is bulk : namespace : table : region : file
1524    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1525  }
1526
1527  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1528    // format is bulk : namespace : table : region : file
1529    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1530    Iterator<String> i = parts.iterator();
1531    int idx = 3;
1532    if (parts.size() == 4) {
1533      // the table is in default namespace
1534      idx = 2;
1535    }
1536    String region = Iterators.get(i, idx);
1537    LOG.debug("bulk row string " + rowStr + " region " + region);
1538    return region;
1539  }
1540
1541  /*
1542   * Used to query bulk loaded hfiles which have been copied by incremental backup
1543   * @param backupId the backup Id. It can be null when querying for all tables
1544   * @return the Scan object
1545   * @deprecated This method is broken if a backupId is specified - see HBASE-28715
1546   */
1547  static Scan createScanForBulkLoadedFiles(String backupId) {
1548    Scan scan = new Scan();
1549    byte[] startRow =
1550      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1551    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1552    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1553    scan.withStartRow(startRow);
1554    scan.withStopRow(stopRow);
1555    scan.addFamily(BackupSystemTable.META_FAMILY);
1556    scan.readVersions(1);
1557    return scan;
1558  }
1559
1560  /**
1561   * Creates Scan operation to load backup set list
1562   * @return scan operation
1563   */
1564  private Scan createScanForBackupSetList() {
1565    Scan scan = new Scan();
1566    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1567    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1568    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1569    scan.withStartRow(startRow);
1570    scan.withStopRow(stopRow);
1571    scan.addFamily(BackupSystemTable.META_FAMILY);
1572    return scan;
1573  }
1574
1575  /**
1576   * Creates Get operation to load backup set content
1577   * @return get operation
1578   */
1579  private Get createGetForBackupSet(String name) {
1580    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1581    get.addFamily(BackupSystemTable.META_FAMILY);
1582    return get;
1583  }
1584
1585  /**
1586   * Creates Delete operation to delete backup set content
1587   * @param name backup set's name
1588   * @return delete operation
1589   */
1590  private Delete createDeleteForBackupSet(String name) {
1591    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1592    del.addFamily(BackupSystemTable.META_FAMILY);
1593    return del;
1594  }
1595
1596  /**
1597   * Creates Put operation to update backup set content
1598   * @param name   backup set's name
1599   * @param tables list of tables
1600   * @return put operation
1601   */
1602  private Put createPutForBackupSet(String name, String[] tables) {
1603    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1604    byte[] value = convertToByteArray(tables);
1605    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1606    return put;
1607  }
1608
1609  private byte[] convertToByteArray(String[] tables) {
1610    return Bytes.toBytes(StringUtils.join(tables, ","));
1611  }
1612
1613  /**
1614   * Converts cell to backup set list.
1615   * @param current current cell
1616   * @return backup set as array of table names
1617   */
1618  private String[] cellValueToBackupSet(Cell current) {
1619    byte[] data = CellUtil.cloneValue(current);
1620    if (!ArrayUtils.isEmpty(data)) {
1621      return Bytes.toString(data).split(",");
1622    }
1623    return new String[0];
1624  }
1625
1626  /**
1627   * Converts cell key to backup set name.
1628   * @param current current cell
1629   * @return backup set name
1630   */
1631  private String cellKeyToBackupSetName(Cell current) {
1632    byte[] data = CellUtil.cloneRow(current);
1633    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1634  }
1635
1636  private static byte[] rowkey(String s, String... other) {
1637    StringBuilder sb = new StringBuilder(s);
1638    for (String ss : other) {
1639      sb.append(ss);
1640    }
1641    return Bytes.toBytes(sb.toString());
1642  }
1643
1644  private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException {
1645    if (!admin.isTableEnabled(tableName)) {
1646      try {
1647        admin.enableTable(tableName);
1648      } catch (TableNotDisabledException ignored) {
1649        LOG.info("Table {} is not disabled, ignoring enable request", tableName);
1650      }
1651    }
1652  }
1653
1654  public enum Order {
1655    /**
1656     * Old backups first, most recents last. I.e. sorted by ascending backupId.
1657     */
1658    OLD_TO_NEW,
1659    /**
1660     * New backups first, oldest last. I.e. sorted by descending backupId.
1661     */
1662    NEW_TO_OLD
1663  }
1664}