001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.io.Closeable;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.nio.charset.StandardCharsets;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Map.Entry;
035import java.util.Objects;
036import java.util.Set;
037import java.util.TreeMap;
038import java.util.TreeSet;
039import java.util.stream.Collectors;
040import org.apache.commons.lang3.ArrayUtils;
041import org.apache.commons.lang3.StringUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.NamespaceDescriptor;
048import org.apache.hadoop.hbase.NamespaceExistException;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableExistsException;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.TableNotDisabledException;
053import org.apache.hadoop.hbase.backup.BackupInfo;
054import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
055import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
056import org.apache.hadoop.hbase.backup.BackupType;
057import org.apache.hadoop.hbase.backup.util.BackupUtils;
058import org.apache.hadoop.hbase.client.Admin;
059import org.apache.hadoop.hbase.client.BufferedMutator;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.Delete;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.SnapshotDescription;
070import org.apache.hadoop.hbase.client.Table;
071import org.apache.hadoop.hbase.client.TableDescriptor;
072import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
080import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
081
082import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
084
085/**
086 * This class provides API to access backup system table<br>
087 * Backup system table schema:<br>
088 * <p>
089 * <ul>
090 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
091 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
092 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of
093 * include table; value=empty</li>
094 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
095 * timestamp]</li>
096 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
097 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
098 * name</li>
099 * </ul>
100 * </p>
101 */
102@InterfaceAudience.Private
103public final class BackupSystemTable implements Closeable {
104
105  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
106
107  static class WALItem {
108    String backupId;
109    String walFile;
110    String backupRoot;
111
112    WALItem(String backupId, String walFile, String backupRoot) {
113      this.backupId = backupId;
114      this.walFile = walFile;
115      this.backupRoot = backupRoot;
116    }
117
118    public String getBackupId() {
119      return backupId;
120    }
121
122    public String getWalFile() {
123      return walFile;
124    }
125
126    public String getBackupRoot() {
127      return backupRoot;
128    }
129
130    @Override
131    public String toString() {
132      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
133    }
134  }
135
136  /**
137   * Backup system table (main) name
138   */
139  private TableName tableName;
140
141  /**
142   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
143   * separate table because we have to isolate general backup operations: create, merge etc from
144   * activity of RegionObserver, which controls process of a bulk loading
145   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
146   */
147  private TableName bulkLoadTableName;
148
149  /**
150   * Stores backup sessions (contexts)
151   */
152  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
153  /**
154   * Stores other meta
155   */
156  final static byte[] META_FAMILY = Bytes.toBytes("meta");
157  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
158  /**
159   * Connection to HBase cluster, shared among all instances
160   */
161  private final Connection connection;
162
163  private final static String BACKUP_INFO_PREFIX = "session:";
164  private final static String START_CODE_ROW = "startcode:";
165  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
166  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
167
168  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
169  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
170
171  private final static String INCR_BACKUP_SET = "incrbackupset:";
172  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
173  private final static String RS_LOG_TS_PREFIX = "rslogts:";
174
175  private final static String BULK_LOAD_PREFIX = "bulk:";
176  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
177  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
178  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
179
180  final static byte[] TBL_COL = Bytes.toBytes("tbl");
181  final static byte[] FAM_COL = Bytes.toBytes("fam");
182  final static byte[] PATH_COL = Bytes.toBytes("path");
183
184  private final static String SET_KEY_PREFIX = "backupset:";
185
186  // separator between BULK_LOAD_PREFIX and ordinals
187  private final static String BLK_LD_DELIM = ":";
188  private final static byte[] EMPTY_VALUE = new byte[] {};
189
190  // Safe delimiter in a string
191  private final static String NULL = "\u0000";
192
193  public BackupSystemTable(Connection conn) throws IOException {
194    this.connection = conn;
195    Configuration conf = this.connection.getConfiguration();
196    tableName = BackupSystemTable.getTableName(conf);
197    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
198    checkSystemTable();
199  }
200
201  private void checkSystemTable() throws IOException {
202    try (Admin admin = connection.getAdmin()) {
203      verifyNamespaceExists(admin);
204      Configuration conf = connection.getConfiguration();
205      if (!admin.tableExists(tableName)) {
206        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
207        createSystemTable(admin, backupHTD);
208      }
209      ensureTableEnabled(admin, tableName);
210      if (!admin.tableExists(bulkLoadTableName)) {
211        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
212        createSystemTable(admin, blHTD);
213      }
214      ensureTableEnabled(admin, bulkLoadTableName);
215      waitForSystemTable(admin, tableName);
216      waitForSystemTable(admin, bulkLoadTableName);
217    }
218  }
219
220  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
221    try {
222      admin.createTable(descriptor);
223    } catch (TableExistsException e) {
224      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
225      // so may be subject to race conditions where one caller succeeds in creating the
226      // table and others fail because it now exists
227      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
228    }
229  }
230
231  private void verifyNamespaceExists(Admin admin) throws IOException {
232    String namespaceName = tableName.getNamespaceAsString();
233    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
234    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
235    boolean exists = false;
236    for (NamespaceDescriptor nsd : list) {
237      if (nsd.getName().equals(ns.getName())) {
238        exists = true;
239        break;
240      }
241    }
242    if (!exists) {
243      try {
244        admin.createNamespace(ns);
245      } catch (NamespaceExistException e) {
246        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
247        // so may be subject to race conditions where one caller succeeds in creating the
248        // namespace and others fail because it now exists
249        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
250      }
251    }
252  }
253
254  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
255    // Return fast if the table is available and avoid a log message
256    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
257      return;
258    }
259    long TIMEOUT = 60000;
260    long startTime = EnvironmentEdgeManager.currentTime();
261    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
262      tableName);
263    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
264      try {
265        Thread.sleep(100);
266      } catch (InterruptedException e) {
267        throw (IOException) new InterruptedIOException().initCause(e);
268      }
269      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
270        throw new IOException(
271          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
272      }
273    }
274    LOG.debug("Backup table {} exists and available", tableName);
275  }
276
277  @Override
278  public void close() {
279    // do nothing
280  }
281
282  /**
283   * Updates status (state) of a backup session in backup system table table
284   * @param info backup info
285   * @throws IOException exception
286   */
287  public void updateBackupInfo(BackupInfo info) throws IOException {
288    if (LOG.isTraceEnabled()) {
289      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
290        + " set status=" + info.getState());
291    }
292    try (Table table = connection.getTable(tableName)) {
293      Put put = createPutForBackupInfo(info);
294      table.put(put);
295    }
296  }
297
298  /*
299   * @param backupId the backup Id
300   * @return Map of rows to path of bulk loaded hfile
301   */
302  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
303    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
304    try (Table table = connection.getTable(bulkLoadTableName);
305      ResultScanner scanner = table.getScanner(scan)) {
306      Result res = null;
307      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
308      while ((res = scanner.next()) != null) {
309        res.advance();
310        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
311        for (Cell cell : res.listCells()) {
312          if (
313            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
314              BackupSystemTable.PATH_COL.length) == 0
315          ) {
316            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
317          }
318        }
319      }
320      return map;
321    }
322  }
323
324  /**
325   * Deletes backup status from backup system table table
326   * @param backupId backup id
327   * @throws IOException exception
328   */
329  public void deleteBackupInfo(String backupId) throws IOException {
330    if (LOG.isTraceEnabled()) {
331      LOG.trace("delete backup status in backup system table for " + backupId);
332    }
333    try (Table table = connection.getTable(tableName)) {
334      Delete del = createDeleteForBackupInfo(backupId);
335      table.delete(del);
336    }
337  }
338
339  /**
340   * Registers a bulk load.
341   * @param tableName     table name
342   * @param region        the region receiving hfile
343   * @param cfToHfilePath column family and associated hfiles
344   */
345  public void registerBulkLoad(TableName tableName, byte[] region,
346    Map<byte[], List<Path>> cfToHfilePath) throws IOException {
347    if (LOG.isDebugEnabled()) {
348      LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
349        cfToHfilePath.size());
350    }
351    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
352      List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
353      bufferedMutator.mutate(puts);
354      LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName);
355    }
356  }
357
358  /**
359   * Removes entries from the table that tracks all bulk loaded hfiles.
360   * @param rows the row keys of the entries to be deleted
361   */
362  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
363    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
364      List<Delete> deletes = new ArrayList<>();
365      for (byte[] row : rows) {
366        Delete del = new Delete(row);
367        deletes.add(del);
368        LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row));
369      }
370      bufferedMutator.mutate(deletes);
371      LOG.debug("Deleted {} bulk load entries.", rows.size());
372    }
373  }
374
375  /**
376   * Reads all registered bulk loads.
377   */
378  public List<BulkLoad> readBulkloadRows() throws IOException {
379    Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null);
380    return processBulkLoadRowScan(scan);
381  }
382
383  /**
384   * Reads the registered bulk loads for the given tables.
385   */
386  public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException {
387    List<BulkLoad> result = new ArrayList<>();
388    for (TableName table : tableList) {
389      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
390      result.addAll(processBulkLoadRowScan(scan));
391    }
392    return result;
393  }
394
395  private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException {
396    List<BulkLoad> result = new ArrayList<>();
397    try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
398      ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
399      Result res;
400      while ((res = scanner.next()) != null) {
401        res.advance();
402        TableName table = null;
403        String fam = null;
404        String path = null;
405        String region = null;
406        byte[] row = null;
407        for (Cell cell : res.listCells()) {
408          row = CellUtil.cloneRow(cell);
409          String rowStr = Bytes.toString(row);
410          region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
411          if (
412            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
413              BackupSystemTable.TBL_COL.length) == 0
414          ) {
415            table = TableName.valueOf(CellUtil.cloneValue(cell));
416          } else if (
417            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
418              BackupSystemTable.FAM_COL.length) == 0
419          ) {
420            fam = Bytes.toString(CellUtil.cloneValue(cell));
421          } else if (
422            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
423              BackupSystemTable.PATH_COL.length) == 0
424          ) {
425            path = Bytes.toString(CellUtil.cloneValue(cell));
426          }
427        }
428        result.add(new BulkLoad(table, region, fam, path, row));
429        LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path);
430      }
431    }
432    return result;
433  }
434
435  /**
436   * Reads backup status object (instance of backup info) from backup system table table
437   * @param backupId backup id
438   * @return Current status of backup session or null
439   */
440  public BackupInfo readBackupInfo(String backupId) throws IOException {
441    if (LOG.isTraceEnabled()) {
442      LOG.trace("read backup status from backup system table for: " + backupId);
443    }
444
445    try (Table table = connection.getTable(tableName)) {
446      Get get = createGetForBackupInfo(backupId);
447      Result res = table.get(get);
448      if (res.isEmpty()) {
449        return null;
450      }
451      return resultToBackupInfo(res);
452    }
453  }
454
455  /**
456   * Read the last backup start code (timestamp) of last successful backup. Will return null if
457   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
458   * there is no successful backup completed so far.
459   * @param backupRoot directory path to backup destination
460   * @return the timestamp of last successful backup
461   * @throws IOException exception
462   */
463  public String readBackupStartCode(String backupRoot) throws IOException {
464    LOG.trace("read backup start code from backup system table");
465
466    try (Table table = connection.getTable(tableName)) {
467      Get get = createGetForStartCode(backupRoot);
468      Result res = table.get(get);
469      if (res.isEmpty()) {
470        return null;
471      }
472      Cell cell = res.listCells().get(0);
473      byte[] val = CellUtil.cloneValue(cell);
474      if (val.length == 0) {
475        return null;
476      }
477      return new String(val, StandardCharsets.UTF_8);
478    }
479  }
480
481  /**
482   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
483   * @param startCode  start code
484   * @param backupRoot root directory path to backup
485   * @throws IOException exception
486   */
487  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
488    if (LOG.isTraceEnabled()) {
489      LOG.trace("write backup start code to backup system table " + startCode);
490    }
491    try (Table table = connection.getTable(tableName)) {
492      Put put = createPutForStartCode(startCode.toString(), backupRoot);
493      table.put(put);
494    }
495  }
496
497  /**
498   * Exclusive operations are: create, delete, merge
499   * @throws IOException if a table operation fails or an active backup exclusive operation is
500   *                     already underway
501   */
502  public void startBackupExclusiveOperation() throws IOException {
503    LOG.debug("Start new backup exclusive operation");
504
505    try (Table table = connection.getTable(tableName)) {
506      Put put = createPutForStartBackupSession();
507      // First try to put if row does not exist
508      if (
509        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
510          .ifNotExists().thenPut(put)
511      ) {
512        // Row exists, try to put if value == ACTIVE_SESSION_NO
513        if (
514          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
515            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
516        ) {
517          throw new ExclusiveOperationException();
518        }
519      }
520    }
521  }
522
523  private Put createPutForStartBackupSession() {
524    Put put = new Put(ACTIVE_SESSION_ROW);
525    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
526    return put;
527  }
528
529  public void finishBackupExclusiveOperation() throws IOException {
530    LOG.debug("Finish backup exclusive operation");
531
532    try (Table table = connection.getTable(tableName)) {
533      Put put = createPutForStopBackupSession();
534      if (
535        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
536          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
537      ) {
538        throw new IOException("There is no active backup exclusive operation");
539      }
540    }
541  }
542
543  private Put createPutForStopBackupSession() {
544    Put put = new Put(ACTIVE_SESSION_ROW);
545    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
546    return put;
547  }
548
549  /**
550   * Get the Region Servers log information after the last log roll from backup system table.
551   * @param backupRoot root directory path to backup
552   * @return RS log info
553   * @throws IOException exception
554   */
555  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
556    throws IOException {
557    LOG.trace("read region server last roll log result to backup system table");
558
559    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
560
561    try (Table table = connection.getTable(tableName);
562      ResultScanner scanner = table.getScanner(scan)) {
563      Result res;
564      HashMap<String, Long> rsTimestampMap = new HashMap<>();
565      while ((res = scanner.next()) != null) {
566        res.advance();
567        Cell cell = res.current();
568        byte[] row = CellUtil.cloneRow(cell);
569        String server = getServerNameForReadRegionServerLastLogRollResult(row);
570        byte[] data = CellUtil.cloneValue(cell);
571        rsTimestampMap.put(server, Bytes.toLong(data));
572      }
573      return rsTimestampMap;
574    }
575  }
576
577  /**
578   * Writes Region Server last roll log result (timestamp) to backup system table table
579   * @param server     Region Server name
580   * @param ts         last log timestamp
581   * @param backupRoot root directory path to backup
582   * @throws IOException exception
583   */
584  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
585    throws IOException {
586    LOG.trace("write region server last roll log result to backup system table");
587
588    try (Table table = connection.getTable(tableName)) {
589      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
590      table.put(put);
591    }
592  }
593
594  /**
595   * Get all completed backup information (in desc order by time)
596   * @param onlyCompleted true, if only successfully completed sessions
597   * @return history info of BackupCompleteData
598   * @throws IOException exception
599   */
600  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
601    LOG.trace("get backup history from backup system table");
602
603    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
604    ArrayList<BackupInfo> list = getBackupInfos(state);
605    return BackupUtils.sortHistoryListDesc(list);
606  }
607
608  /**
609   * Get all backups history
610   * @return list of backup info
611   * @throws IOException if getting the backup history fails
612   */
613  public List<BackupInfo> getBackupHistory() throws IOException {
614    return getBackupHistory(false);
615  }
616
617  /**
618   * Get first n backup history records
619   * @param n number of records, if n== -1 - max number is ignored
620   * @return list of records
621   * @throws IOException if getting the backup history fails
622   */
623  public List<BackupInfo> getHistory(int n) throws IOException {
624    List<BackupInfo> history = getBackupHistory();
625    if (n == -1 || history.size() <= n) {
626      return history;
627    }
628    return Collections.unmodifiableList(history.subList(0, n));
629  }
630
631  /**
632   * Get backup history records filtered by list of filters.
633   * @param n       max number of records, if n == -1 , then max number is ignored
634   * @param filters list of filters
635   * @return backup records
636   * @throws IOException if getting the backup history fails
637   */
638  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
639    if (filters.length == 0) {
640      return getHistory(n);
641    }
642
643    List<BackupInfo> history = getBackupHistory();
644    List<BackupInfo> result = new ArrayList<>();
645    for (BackupInfo bi : history) {
646      if (n >= 0 && result.size() == n) {
647        break;
648      }
649
650      boolean passed = true;
651      for (int i = 0; i < filters.length; i++) {
652        if (!filters[i].apply(bi)) {
653          passed = false;
654          break;
655        }
656      }
657      if (passed) {
658        result.add(bi);
659      }
660    }
661    return result;
662  }
663
664  /**
665   * Retrieve all table names that are part of any known backup
666   */
667  public Set<TableName> getTablesIncludedInBackups() throws IOException {
668    Set<TableName> names = new HashSet<>();
669    List<BackupInfo> infos = getBackupHistory(true);
670    for (BackupInfo info : infos) {
671      // Incremental backups have the same tables as the preceding full backups
672      if (info.getType() == BackupType.FULL) {
673        names.addAll(info.getTableNames());
674      }
675    }
676    return names;
677  }
678
679  /**
680   * Get history for backup destination
681   * @param backupRoot backup destination path
682   * @return List of backup info
683   * @throws IOException if getting the backup history fails
684   */
685  public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
686    ArrayList<BackupInfo> history = getBackupHistory(false);
687    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
688      BackupInfo info = iterator.next();
689      if (!backupRoot.equals(info.getBackupRootDir())) {
690        iterator.remove();
691      }
692    }
693    return history;
694  }
695
696  /**
697   * Get history for a table
698   * @param name table name
699   * @return history for a table
700   * @throws IOException if getting the backup history fails
701   */
702  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
703    List<BackupInfo> history = getBackupHistory();
704    List<BackupInfo> tableHistory = new ArrayList<>();
705    for (BackupInfo info : history) {
706      List<TableName> tables = info.getTableNames();
707      if (tables.contains(name)) {
708        tableHistory.add(info);
709      }
710    }
711    return tableHistory;
712  }
713
714  /**
715   * Goes through all backup history corresponding to the provided root folder, and collects all
716   * backup info mentioning each of the provided tables.
717   * @param set        the tables for which to collect the {@code BackupInfo}
718   * @param backupRoot backup destination path to retrieve backup history for
719   * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at
720   *         least one {@code BackupInfo}
721   * @throws IOException if getting the backup history fails
722   */
723  public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
724    String backupRoot) throws IOException {
725    List<BackupInfo> history = getBackupHistory(backupRoot);
726    Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>();
727    for (BackupInfo info : history) {
728      List<TableName> tables = info.getTableNames();
729      for (TableName tableName : tables) {
730        if (set.contains(tableName)) {
731          List<BackupInfo> list =
732            tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>());
733          list.add(info);
734        }
735      }
736    }
737    return tableHistoryMap;
738  }
739
740  /**
741   * Get all backup sessions with a given state (in descending order by time)
742   * @param state backup session state
743   * @return history info of backup info objects
744   * @throws IOException exception
745   */
746  public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
747    LOG.trace("get backup infos from backup system table");
748
749    Scan scan = createScanForBackupHistory();
750    ArrayList<BackupInfo> list = new ArrayList<>();
751
752    try (Table table = connection.getTable(tableName);
753      ResultScanner scanner = table.getScanner(scan)) {
754      Result res;
755      while ((res = scanner.next()) != null) {
756        res.advance();
757        BackupInfo context = cellToBackupInfo(res.current());
758        if (state != BackupState.ANY && context.getState() != state) {
759          continue;
760        }
761        list.add(context);
762      }
763      return list;
764    }
765  }
766
767  /**
768   * Write the current timestamps for each regionserver to backup system table after a successful
769   * full or incremental backup. The saved timestamp is of the last log file that was backed up
770   * already.
771   * @param tables        tables
772   * @param newTimestamps timestamps
773   * @param backupRoot    root directory path to backup
774   * @throws IOException exception
775   */
776  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
777    String backupRoot) throws IOException {
778    if (LOG.isTraceEnabled()) {
779      LOG.trace("write RS log time stamps to backup system table for tables ["
780        + StringUtils.join(tables, ",") + "]");
781    }
782    List<Put> puts = new ArrayList<>();
783    for (TableName table : tables) {
784      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
785      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
786      puts.add(put);
787    }
788    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) {
789      bufferedMutator.mutate(puts);
790    }
791  }
792
793  /**
794   * Read the timestamp for each region server log after the last successful backup. Each table has
795   * its own set of the timestamps. The info is stored for each table as a concatenated string of
796   * rs->timestapmp
797   * @param backupRoot root directory path to backup
798   * @return the timestamp for each region server. key: tableName value:
799   *         RegionServer,PreviousTimeStamp
800   * @throws IOException exception
801   */
802  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
803    throws IOException {
804    if (LOG.isTraceEnabled()) {
805      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
806    }
807
808    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
809
810    Scan scan = createScanForReadLogTimestampMap(backupRoot);
811    try (Table table = connection.getTable(tableName);
812      ResultScanner scanner = table.getScanner(scan)) {
813      Result res;
814      while ((res = scanner.next()) != null) {
815        res.advance();
816        Cell cell = res.current();
817        byte[] row = CellUtil.cloneRow(cell);
818        String tabName = getTableNameForReadLogTimestampMap(row);
819        TableName tn = TableName.valueOf(tabName);
820        byte[] data = CellUtil.cloneValue(cell);
821        if (data == null) {
822          throw new IOException("Data of last backup data from backup system table "
823            + "is empty. Create a backup first.");
824        }
825        if (data != null && data.length > 0) {
826          HashMap<String, Long> lastBackup =
827            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
828          tableTimestampMap.put(tn, lastBackup);
829        }
830      }
831      return tableTimestampMap;
832    }
833  }
834
835  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
836    Map<String, Long> map) {
837    BackupProtos.TableServerTimestamp.Builder tstBuilder =
838      BackupProtos.TableServerTimestamp.newBuilder();
839    tstBuilder
840      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
841
842    for (Entry<String, Long> entry : map.entrySet()) {
843      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
844      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
845      ServerName sn = ServerName.parseServerName(entry.getKey());
846      snBuilder.setHostName(sn.getHostname());
847      snBuilder.setPort(sn.getPort());
848      builder.setServerName(snBuilder.build());
849      builder.setTimestamp(entry.getValue());
850      tstBuilder.addServerTimestamp(builder.build());
851    }
852
853    return tstBuilder.build();
854  }
855
856  private HashMap<String, Long>
857    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
858
859    HashMap<String, Long> map = new HashMap<>();
860    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
861    for (BackupProtos.ServerTimestamp st : list) {
862      ServerName sn =
863        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
864      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
865    }
866    return map;
867  }
868
869  /**
870   * Return the current tables covered by incremental backup.
871   * @param backupRoot root directory path to backup
872   * @return set of tableNames
873   * @throws IOException exception
874   */
875  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
876    LOG.trace("get incremental backup table set from backup system table");
877
878    TreeSet<TableName> set = new TreeSet<>();
879
880    try (Table table = connection.getTable(tableName)) {
881      Get get = createGetForIncrBackupTableSet(backupRoot);
882      Result res = table.get(get);
883      if (res.isEmpty()) {
884        return set;
885      }
886      List<Cell> cells = res.listCells();
887      for (Cell cell : cells) {
888        // qualifier = table name - we use table names as qualifiers
889        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
890      }
891      return set;
892    }
893  }
894
895  /**
896   * Add tables to global incremental backup set
897   * @param tables     set of tables
898   * @param backupRoot root directory path to backup
899   * @throws IOException exception
900   */
901  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
902    throws IOException {
903    if (LOG.isTraceEnabled()) {
904      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
905        + " tables [" + StringUtils.join(tables, " ") + "]");
906    }
907    if (LOG.isDebugEnabled()) {
908      tables.forEach(table -> LOG.debug(Objects.toString(table)));
909    }
910    try (Table table = connection.getTable(tableName)) {
911      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
912      table.put(put);
913    }
914  }
915
916  /**
917   * Deletes incremental backup set for a backup destination
918   * @param backupRoot backup root
919   */
920  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
921    if (LOG.isTraceEnabled()) {
922      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
923    }
924    try (Table table = connection.getTable(tableName)) {
925      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
926      table.delete(delete);
927    }
928  }
929
930  /**
931   * Checks if we have at least one backup session in backup system table This API is used by
932   * BackupLogCleaner
933   * @return true, if - at least one session exists in backup system table table
934   * @throws IOException exception
935   */
936  public boolean hasBackupSessions() throws IOException {
937    LOG.trace("Has backup sessions from backup system table");
938
939    boolean result = false;
940    Scan scan = createScanForBackupHistory();
941    scan.setCaching(1);
942    try (Table table = connection.getTable(tableName);
943      ResultScanner scanner = table.getScanner(scan)) {
944      if (scanner.next() != null) {
945        result = true;
946      }
947      return result;
948    }
949  }
950
951  /**
952   * BACKUP SETS
953   */
954
955  /**
956   * Get backup set list
957   * @return backup set list
958   * @throws IOException if a table or scanner operation fails
959   */
960  public List<String> listBackupSets() throws IOException {
961    LOG.trace("Backup set list");
962
963    List<String> list = new ArrayList<>();
964    try (Table table = connection.getTable(tableName)) {
965      Scan scan = createScanForBackupSetList();
966      scan.readVersions(1);
967      try (ResultScanner scanner = table.getScanner(scan)) {
968        Result res;
969        while ((res = scanner.next()) != null) {
970          res.advance();
971          list.add(cellKeyToBackupSetName(res.current()));
972        }
973        return list;
974      }
975    }
976  }
977
978  /**
979   * Get backup set description (list of tables)
980   * @param name set's name
981   * @return list of tables in a backup set
982   * @throws IOException if a table operation fails
983   */
984  public List<TableName> describeBackupSet(String name) throws IOException {
985    if (LOG.isTraceEnabled()) {
986      LOG.trace(" Backup set describe: " + name);
987    }
988    try (Table table = connection.getTable(tableName)) {
989      Get get = createGetForBackupSet(name);
990      Result res = table.get(get);
991      if (res.isEmpty()) {
992        return null;
993      }
994      res.advance();
995      String[] tables = cellValueToBackupSet(res.current());
996      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
997        .collect(Collectors.toList());
998    }
999  }
1000
1001  /**
1002   * Add backup set (list of tables)
1003   * @param name      set name
1004   * @param newTables list of tables, comma-separated
1005   * @throws IOException if a table operation fails
1006   */
1007  public void addToBackupSet(String name, String[] newTables) throws IOException {
1008    if (LOG.isTraceEnabled()) {
1009      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
1010    }
1011    String[] union = null;
1012    try (Table table = connection.getTable(tableName)) {
1013      Get get = createGetForBackupSet(name);
1014      Result res = table.get(get);
1015      if (res.isEmpty()) {
1016        union = newTables;
1017      } else {
1018        res.advance();
1019        String[] tables = cellValueToBackupSet(res.current());
1020        union = merge(tables, newTables);
1021      }
1022      Put put = createPutForBackupSet(name, union);
1023      table.put(put);
1024    }
1025  }
1026
1027  /**
1028   * Remove tables from backup set (list of tables)
1029   * @param name     set name
1030   * @param toRemove list of tables
1031   * @throws IOException if a table operation or deleting the backup set fails
1032   */
1033  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
1034    if (LOG.isTraceEnabled()) {
1035      LOG.trace(
1036        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
1037    }
1038    String[] disjoint;
1039    String[] tables;
1040    try (Table table = connection.getTable(tableName)) {
1041      Get get = createGetForBackupSet(name);
1042      Result res = table.get(get);
1043      if (res.isEmpty()) {
1044        LOG.warn("Backup set '" + name + "' not found.");
1045        return;
1046      } else {
1047        res.advance();
1048        tables = cellValueToBackupSet(res.current());
1049        disjoint = disjoin(tables, toRemove);
1050      }
1051      if (disjoint.length > 0 && disjoint.length != tables.length) {
1052        Put put = createPutForBackupSet(name, disjoint);
1053        table.put(put);
1054      } else if (disjoint.length == tables.length) {
1055        LOG.warn("Backup set '" + name + "' does not contain tables ["
1056          + StringUtils.join(toRemove, " ") + "]");
1057      } else { // disjoint.length == 0 and tables.length >0
1058        // Delete backup set
1059        LOG.info("Backup set '" + name + "' is empty. Deleting.");
1060        deleteBackupSet(name);
1061      }
1062    }
1063  }
1064
1065  private String[] merge(String[] existingTables, String[] newTables) {
1066    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1067    tables.addAll(Arrays.asList(newTables));
1068    return tables.toArray(new String[0]);
1069  }
1070
1071  private String[] disjoin(String[] existingTables, String[] toRemove) {
1072    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1073    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
1074    return tables.toArray(new String[0]);
1075  }
1076
1077  /**
1078   * Delete backup set
1079   * @param name set's name
1080   * @throws IOException if getting or deleting the table fails
1081   */
1082  public void deleteBackupSet(String name) throws IOException {
1083    if (LOG.isTraceEnabled()) {
1084      LOG.trace(" Backup set delete: " + name);
1085    }
1086    try (Table table = connection.getTable(tableName)) {
1087      Delete del = createDeleteForBackupSet(name);
1088      table.delete(del);
1089    }
1090  }
1091
1092  /**
1093   * Get backup system table descriptor
1094   * @return table's descriptor
1095   */
1096  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1097    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1098
1099    ColumnFamilyDescriptorBuilder colBuilder =
1100      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1101
1102    colBuilder.setMaxVersions(1);
1103    Configuration config = HBaseConfiguration.create();
1104    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1105      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1106    colBuilder.setTimeToLive(ttl);
1107
1108    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1109    builder.setColumnFamily(colSessionsDesc);
1110
1111    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1112    colBuilder.setTimeToLive(ttl);
1113    builder.setColumnFamily(colBuilder.build());
1114    return builder.build();
1115  }
1116
1117  public static TableName getTableName(Configuration conf) {
1118    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1119      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1120    return TableName.valueOf(name);
1121  }
1122
1123  public static String getTableNameAsString(Configuration conf) {
1124    return getTableName(conf).getNameAsString();
1125  }
1126
1127  public static String getSnapshotName(Configuration conf) {
1128    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1129  }
1130
1131  /**
1132   * Get backup system table descriptor
1133   * @return table's descriptor
1134   */
1135  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1136    TableDescriptorBuilder builder =
1137      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1138
1139    ColumnFamilyDescriptorBuilder colBuilder =
1140      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1141    colBuilder.setMaxVersions(1);
1142    Configuration config = HBaseConfiguration.create();
1143    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1144      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1145    colBuilder.setTimeToLive(ttl);
1146    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1147    builder.setColumnFamily(colSessionsDesc);
1148    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1149    colBuilder.setTimeToLive(ttl);
1150    builder.setColumnFamily(colBuilder.build());
1151    return builder.build();
1152  }
1153
1154  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1155    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1156      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1157    return TableName.valueOf(name);
1158  }
1159
1160  /**
1161   * Creates Put operation for a given backup info object
1162   * @param context backup info
1163   * @return put operation
1164   * @throws IOException exception
1165   */
1166  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1167    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1168    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1169      context.toByteArray());
1170    return put;
1171  }
1172
1173  /**
1174   * Creates Get operation for a given backup id
1175   * @param backupId backup's ID
1176   * @return get operation
1177   * @throws IOException exception
1178   */
1179  private Get createGetForBackupInfo(String backupId) throws IOException {
1180    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1181    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1182    get.readVersions(1);
1183    return get;
1184  }
1185
1186  /**
1187   * Creates Delete operation for a given backup id
1188   * @param backupId backup's ID
1189   * @return delete operation
1190   */
1191  private Delete createDeleteForBackupInfo(String backupId) {
1192    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1193    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1194    return del;
1195  }
1196
1197  /**
1198   * Converts Result to BackupInfo
1199   * @param res HBase result
1200   * @return backup info instance
1201   * @throws IOException exception
1202   */
1203  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1204    res.advance();
1205    Cell cell = res.current();
1206    return cellToBackupInfo(cell);
1207  }
1208
1209  /**
1210   * Creates Get operation to retrieve start code from backup system table
1211   * @return get operation
1212   * @throws IOException exception
1213   */
1214  private Get createGetForStartCode(String rootPath) throws IOException {
1215    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1216    get.addFamily(BackupSystemTable.META_FAMILY);
1217    get.readVersions(1);
1218    return get;
1219  }
1220
1221  /**
1222   * Creates Put operation to store start code to backup system table
1223   * @return put operation
1224   */
1225  private Put createPutForStartCode(String startCode, String rootPath) {
1226    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1227    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1228      Bytes.toBytes(startCode));
1229    return put;
1230  }
1231
1232  /**
1233   * Creates Get to retrieve incremental backup table set from backup system table
1234   * @return get operation
1235   * @throws IOException exception
1236   */
1237  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1238    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1239    get.addFamily(BackupSystemTable.META_FAMILY);
1240    get.readVersions(1);
1241    return get;
1242  }
1243
1244  /**
1245   * Creates Put to store incremental backup table set
1246   * @param tables tables
1247   * @return put operation
1248   */
1249  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1250    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1251    for (TableName table : tables) {
1252      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1253        EMPTY_VALUE);
1254    }
1255    return put;
1256  }
1257
1258  /**
1259   * Creates Delete for incremental backup table set
1260   * @param backupRoot backup root
1261   * @return delete operation
1262   */
1263  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1264    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1265    delete.addFamily(BackupSystemTable.META_FAMILY);
1266    return delete;
1267  }
1268
1269  /**
1270   * Creates Scan operation to load backup history
1271   * @return scan operation
1272   */
1273  private Scan createScanForBackupHistory() {
1274    Scan scan = new Scan();
1275    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1276    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1277    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1278    scan.withStartRow(startRow);
1279    scan.withStopRow(stopRow);
1280    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1281    scan.readVersions(1);
1282    return scan;
1283  }
1284
1285  /**
1286   * Converts cell to backup info instance.
1287   * @param current current cell
1288   * @return backup backup info instance
1289   * @throws IOException exception
1290   */
1291  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1292    byte[] data = CellUtil.cloneValue(current);
1293    return BackupInfo.fromByteArray(data);
1294  }
1295
1296  /**
1297   * Creates Put to write RS last roll log timestamp map
1298   * @param table table
1299   * @param smap  map, containing RS:ts
1300   * @return put operation
1301   */
1302  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1303    String backupRoot) {
1304    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1305    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1306    return put;
1307  }
1308
1309  /**
1310   * Creates Scan to load table-> { RS -> ts} map of maps
1311   * @return scan operation
1312   */
1313  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1314    Scan scan = new Scan();
1315    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1316    scan.addFamily(BackupSystemTable.META_FAMILY);
1317
1318    return scan;
1319  }
1320
1321  /**
1322   * Get table name from rowkey
1323   * @param cloneRow rowkey
1324   * @return table name
1325   */
1326  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1327    String s = Bytes.toString(cloneRow);
1328    int index = s.lastIndexOf(NULL);
1329    return s.substring(index + 1);
1330  }
1331
1332  /**
1333   * Creates Put to store RS last log result
1334   * @param server    server name
1335   * @param timestamp log roll result (timestamp)
1336   * @return put operation
1337   */
1338  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1339    String backupRoot) {
1340    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1341    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1342      Bytes.toBytes(timestamp));
1343    return put;
1344  }
1345
1346  /**
1347   * Creates Scan operation to load last RS log roll results
1348   * @return scan operation
1349   */
1350  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1351    Scan scan = new Scan();
1352    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1353    scan.addFamily(BackupSystemTable.META_FAMILY);
1354    scan.readVersions(1);
1355
1356    return scan;
1357  }
1358
1359  /**
1360   * Get server's name from rowkey
1361   * @param row rowkey
1362   * @return server's name
1363   */
1364  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1365    String s = Bytes.toString(row);
1366    int index = s.lastIndexOf(NULL);
1367    return s.substring(index + 1);
1368  }
1369
1370  /**
1371   * Creates Put's for bulk loads.
1372   */
1373  private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
1374    Map<byte[], List<Path>> columnFamilyToHFilePaths) {
1375    List<Put> puts = new ArrayList<>();
1376    for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
1377      for (Path path : entry.getValue()) {
1378        String file = path.toString();
1379        int lastSlash = file.lastIndexOf("/");
1380        String filename = file.substring(lastSlash + 1);
1381        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1382          Bytes.toString(region), BLK_LD_DELIM, filename));
1383        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1384        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1385        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1386        puts.add(put);
1387        LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
1388      }
1389    }
1390    return puts;
1391  }
1392
1393  public static void snapshot(Connection conn) throws IOException {
1394    try (Admin admin = conn.getAdmin()) {
1395      Configuration conf = conn.getConfiguration();
1396      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1397    }
1398  }
1399
1400  public static void restoreFromSnapshot(Connection conn) throws IOException {
1401    Configuration conf = conn.getConfiguration();
1402    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1403    try (Admin admin = conn.getAdmin()) {
1404      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1405      if (snapshotExists(admin, snapshotName)) {
1406        admin.disableTable(BackupSystemTable.getTableName(conf));
1407        admin.restoreSnapshot(snapshotName);
1408        admin.enableTable(BackupSystemTable.getTableName(conf));
1409        LOG.debug("Done restoring backup system table");
1410      } else {
1411        // Snapshot does not exists, i.e completeBackup failed after
1412        // deleting backup system table snapshot
1413        // In this case we log WARN and proceed
1414        LOG.warn(
1415          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1416      }
1417    }
1418  }
1419
1420  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1421    List<SnapshotDescription> list = admin.listSnapshots();
1422    for (SnapshotDescription desc : list) {
1423      if (desc.getName().equals(snapshotName)) {
1424        return true;
1425      }
1426    }
1427    return false;
1428  }
1429
1430  public static boolean snapshotExists(Connection conn) throws IOException {
1431    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1432  }
1433
1434  public static void deleteSnapshot(Connection conn) throws IOException {
1435    Configuration conf = conn.getConfiguration();
1436    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1437    try (Admin admin = conn.getAdmin()) {
1438      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1439      if (snapshotExists(admin, snapshotName)) {
1440        admin.deleteSnapshot(snapshotName);
1441        LOG.debug("Done deleting backup system table snapshot");
1442      } else {
1443        LOG.error("Snapshot " + snapshotName + " does not exists");
1444      }
1445    }
1446  }
1447
1448  private Put createPutForDeleteOperation(String[] backupIdList) {
1449    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1450    Put put = new Put(DELETE_OP_ROW);
1451    put.addColumn(META_FAMILY, FAM_COL, value);
1452    return put;
1453  }
1454
1455  private Delete createDeleteForBackupDeleteOperation() {
1456    Delete delete = new Delete(DELETE_OP_ROW);
1457    delete.addFamily(META_FAMILY);
1458    return delete;
1459  }
1460
1461  private Get createGetForDeleteOperation() {
1462    Get get = new Get(DELETE_OP_ROW);
1463    get.addFamily(META_FAMILY);
1464    return get;
1465  }
1466
1467  public void startDeleteOperation(String[] backupIdList) throws IOException {
1468    if (LOG.isTraceEnabled()) {
1469      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1470    }
1471    Put put = createPutForDeleteOperation(backupIdList);
1472    try (Table table = connection.getTable(tableName)) {
1473      table.put(put);
1474    }
1475  }
1476
1477  public void finishDeleteOperation() throws IOException {
1478    LOG.trace("Finsih delete operation for backup ids");
1479
1480    Delete delete = createDeleteForBackupDeleteOperation();
1481    try (Table table = connection.getTable(tableName)) {
1482      table.delete(delete);
1483    }
1484  }
1485
1486  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1487    LOG.trace("Get delete operation for backup ids");
1488
1489    Get get = createGetForDeleteOperation();
1490    try (Table table = connection.getTable(tableName)) {
1491      Result res = table.get(get);
1492      if (res.isEmpty()) {
1493        return null;
1494      }
1495      Cell cell = res.listCells().get(0);
1496      byte[] val = CellUtil.cloneValue(cell);
1497      if (val.length == 0) {
1498        return null;
1499      }
1500      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1501        .toArray(String[]::new);
1502    }
1503  }
1504
1505  private Put createPutForMergeOperation(String[] backupIdList) {
1506    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1507    Put put = new Put(MERGE_OP_ROW);
1508    put.addColumn(META_FAMILY, FAM_COL, value);
1509    return put;
1510  }
1511
1512  public boolean isMergeInProgress() throws IOException {
1513    Get get = new Get(MERGE_OP_ROW);
1514    try (Table table = connection.getTable(tableName)) {
1515      Result res = table.get(get);
1516      return !res.isEmpty();
1517    }
1518  }
1519
1520  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1521    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1522    Put put = new Put(MERGE_OP_ROW);
1523    put.addColumn(META_FAMILY, PATH_COL, value);
1524    return put;
1525  }
1526
1527  private Delete createDeleteForBackupMergeOperation() {
1528    Delete delete = new Delete(MERGE_OP_ROW);
1529    delete.addFamily(META_FAMILY);
1530    return delete;
1531  }
1532
1533  private Get createGetForMergeOperation() {
1534    Get get = new Get(MERGE_OP_ROW);
1535    get.addFamily(META_FAMILY);
1536    return get;
1537  }
1538
1539  public void startMergeOperation(String[] backupIdList) throws IOException {
1540    if (LOG.isTraceEnabled()) {
1541      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1542    }
1543    Put put = createPutForMergeOperation(backupIdList);
1544    try (Table table = connection.getTable(tableName)) {
1545      table.put(put);
1546    }
1547  }
1548
1549  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1550    if (LOG.isTraceEnabled()) {
1551      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1552    }
1553    Put put = createPutForUpdateTablesForMerge(tables);
1554    try (Table table = connection.getTable(tableName)) {
1555      table.put(put);
1556    }
1557  }
1558
1559  public void finishMergeOperation() throws IOException {
1560    LOG.trace("Finish merge operation for backup ids");
1561
1562    Delete delete = createDeleteForBackupMergeOperation();
1563    try (Table table = connection.getTable(tableName)) {
1564      table.delete(delete);
1565    }
1566  }
1567
1568  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1569    LOG.trace("Get backup ids for merge operation");
1570
1571    Get get = createGetForMergeOperation();
1572    try (Table table = connection.getTable(tableName)) {
1573      Result res = table.get(get);
1574      if (res.isEmpty()) {
1575        return null;
1576      }
1577      Cell cell = res.listCells().get(0);
1578      byte[] val = CellUtil.cloneValue(cell);
1579      if (val.length == 0) {
1580        return null;
1581      }
1582      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1583        .toArray(String[]::new);
1584    }
1585  }
1586
1587  /**
1588   * Creates a scan to read all registered bulk loads for the given table, or for all tables if
1589   * {@code table} is {@code null}.
1590   */
1591  static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) {
1592    Scan scan = new Scan();
1593    byte[] startRow = table == null
1594      ? BULK_LOAD_PREFIX_BYTES
1595      : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1596    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1597    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1598    scan.withStartRow(startRow);
1599    scan.withStopRow(stopRow);
1600    scan.addFamily(BackupSystemTable.META_FAMILY);
1601    scan.readVersions(1);
1602    return scan;
1603  }
1604
1605  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1606    // format is bulk : namespace : table : region : file
1607    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1608  }
1609
1610  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1611    // format is bulk : namespace : table : region : file
1612    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1613    Iterator<String> i = parts.iterator();
1614    int idx = 3;
1615    if (parts.size() == 4) {
1616      // the table is in default namespace
1617      idx = 2;
1618    }
1619    String region = Iterators.get(i, idx);
1620    LOG.debug("bulk row string " + rowStr + " region " + region);
1621    return region;
1622  }
1623
1624  /*
1625   * Used to query bulk loaded hfiles which have been copied by incremental backup
1626   * @param backupId the backup Id. It can be null when querying for all tables
1627   * @return the Scan object
1628   * @deprecated This method is broken if a backupId is specified - see HBASE-28715
1629   */
1630  static Scan createScanForBulkLoadedFiles(String backupId) {
1631    Scan scan = new Scan();
1632    byte[] startRow =
1633      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1634    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1635    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1636    scan.withStartRow(startRow);
1637    scan.withStopRow(stopRow);
1638    scan.addFamily(BackupSystemTable.META_FAMILY);
1639    scan.readVersions(1);
1640    return scan;
1641  }
1642
1643  /**
1644   * Creates Scan operation to load backup set list
1645   * @return scan operation
1646   */
1647  private Scan createScanForBackupSetList() {
1648    Scan scan = new Scan();
1649    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1650    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1651    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1652    scan.withStartRow(startRow);
1653    scan.withStopRow(stopRow);
1654    scan.addFamily(BackupSystemTable.META_FAMILY);
1655    return scan;
1656  }
1657
1658  /**
1659   * Creates Get operation to load backup set content
1660   * @return get operation
1661   */
1662  private Get createGetForBackupSet(String name) {
1663    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1664    get.addFamily(BackupSystemTable.META_FAMILY);
1665    return get;
1666  }
1667
1668  /**
1669   * Creates Delete operation to delete backup set content
1670   * @param name backup set's name
1671   * @return delete operation
1672   */
1673  private Delete createDeleteForBackupSet(String name) {
1674    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1675    del.addFamily(BackupSystemTable.META_FAMILY);
1676    return del;
1677  }
1678
1679  /**
1680   * Creates Put operation to update backup set content
1681   * @param name   backup set's name
1682   * @param tables list of tables
1683   * @return put operation
1684   */
1685  private Put createPutForBackupSet(String name, String[] tables) {
1686    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1687    byte[] value = convertToByteArray(tables);
1688    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1689    return put;
1690  }
1691
1692  private byte[] convertToByteArray(String[] tables) {
1693    return Bytes.toBytes(StringUtils.join(tables, ","));
1694  }
1695
1696  /**
1697   * Converts cell to backup set list.
1698   * @param current current cell
1699   * @return backup set as array of table names
1700   */
1701  private String[] cellValueToBackupSet(Cell current) {
1702    byte[] data = CellUtil.cloneValue(current);
1703    if (!ArrayUtils.isEmpty(data)) {
1704      return Bytes.toString(data).split(",");
1705    }
1706    return new String[0];
1707  }
1708
1709  /**
1710   * Converts cell key to backup set name.
1711   * @param current current cell
1712   * @return backup set name
1713   */
1714  private String cellKeyToBackupSetName(Cell current) {
1715    byte[] data = CellUtil.cloneRow(current);
1716    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1717  }
1718
1719  private static byte[] rowkey(String s, String... other) {
1720    StringBuilder sb = new StringBuilder(s);
1721    for (String ss : other) {
1722      sb.append(ss);
1723    }
1724    return Bytes.toBytes(sb.toString());
1725  }
1726
1727  private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException {
1728    if (!admin.isTableEnabled(tableName)) {
1729      try {
1730        admin.enableTable(tableName);
1731      } catch (TableNotDisabledException ignored) {
1732        LOG.info("Table {} is not disabled, ignoring enable request", tableName);
1733      }
1734    }
1735  }
1736}