001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot;
021import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
022import static org.apache.hadoop.hbase.backup.BackupInfo.withType;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.nio.charset.StandardCharsets;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Objects;
040import java.util.Set;
041import java.util.TreeMap;
042import java.util.TreeSet;
043import java.util.function.Predicate;
044import java.util.stream.Collectors;
045import java.util.stream.Stream;
046import org.apache.commons.lang3.ArrayUtils;
047import org.apache.commons.lang3.StringUtils;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.Cell;
051import org.apache.hadoop.hbase.CellUtil;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.NamespaceDescriptor;
054import org.apache.hadoop.hbase.NamespaceExistException;
055import org.apache.hadoop.hbase.ServerName;
056import org.apache.hadoop.hbase.TableExistsException;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.TableNotDisabledException;
059import org.apache.hadoop.hbase.backup.BackupInfo;
060import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
061import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
062import org.apache.hadoop.hbase.backup.BackupType;
063import org.apache.hadoop.hbase.client.Admin;
064import org.apache.hadoop.hbase.client.BufferedMutator;
065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
067import org.apache.hadoop.hbase.client.Connection;
068import org.apache.hadoop.hbase.client.Delete;
069import org.apache.hadoop.hbase.client.Get;
070import org.apache.hadoop.hbase.client.Put;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.SnapshotDescription;
075import org.apache.hadoop.hbase.client.Table;
076import org.apache.hadoop.hbase.client.TableDescriptor;
077import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
078import org.apache.hadoop.hbase.util.Bytes;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
085import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
086import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
087
088import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
090
091/**
092 * This class provides API to access backup system table<br>
093 * Backup system table schema:<br>
094 * <p>
095 * <ul>
096 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
097 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
098 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of
099 * include table; value=empty</li>
100 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
101 * timestamp]</li>
102 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
103 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
104 * name</li>
105 * </ul>
106 * </p>
107 */
108@InterfaceAudience.Private
109public final class BackupSystemTable implements Closeable {
110
111  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
112
113  static class WALItem {
114    String backupId;
115    String walFile;
116    String backupRoot;
117
118    WALItem(String backupId, String walFile, String backupRoot) {
119      this.backupId = backupId;
120      this.walFile = walFile;
121      this.backupRoot = backupRoot;
122    }
123
124    public String getBackupId() {
125      return backupId;
126    }
127
128    public String getWalFile() {
129      return walFile;
130    }
131
132    public String getBackupRoot() {
133      return backupRoot;
134    }
135
136    @Override
137    public String toString() {
138      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
139    }
140  }
141
142  /**
143   * Backup system table (main) name
144   */
145  private TableName tableName;
146
147  /**
148   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
149   * separate table because we have to isolate general backup operations: create, merge etc from
150   * activity of RegionObserver, which controls process of a bulk loading
151   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
152   */
153  private TableName bulkLoadTableName;
154
155  /**
156   * Stores backup sessions (contexts)
157   */
158  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
159  /**
160   * Stores other meta
161   */
162  final static byte[] META_FAMILY = Bytes.toBytes("meta");
163  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
164  /**
165   * Connection to HBase cluster, shared among all instances
166   */
167  private final Connection connection;
168
169  private final static String BACKUP_INFO_PREFIX = "session:";
170  private final static String START_CODE_ROW = "startcode:";
171  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
172  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
173
174  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
175  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
176
177  private final static String INCR_BACKUP_SET = "incrbackupset:";
178  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
179  private final static String RS_LOG_TS_PREFIX = "rslogts:";
180
181  private final static String BULK_LOAD_PREFIX = "bulk:";
182  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
183  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
184  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
185
186  final static byte[] TBL_COL = Bytes.toBytes("tbl");
187  final static byte[] FAM_COL = Bytes.toBytes("fam");
188  final static byte[] PATH_COL = Bytes.toBytes("path");
189
190  private final static String SET_KEY_PREFIX = "backupset:";
191
192  // separator between BULK_LOAD_PREFIX and ordinals
193  private final static String BLK_LD_DELIM = ":";
194  private final static byte[] EMPTY_VALUE = new byte[] {};
195
196  // Safe delimiter in a string
197  private final static String NULL = "\u0000";
198
199  public BackupSystemTable(Connection conn) throws IOException {
200    this.connection = conn;
201    Configuration conf = this.connection.getConfiguration();
202    tableName = BackupSystemTable.getTableName(conf);
203    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
204    checkSystemTable();
205  }
206
207  private void checkSystemTable() throws IOException {
208    try (Admin admin = connection.getAdmin()) {
209      verifyNamespaceExists(admin);
210      Configuration conf = connection.getConfiguration();
211      if (!admin.tableExists(tableName)) {
212        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
213        createSystemTable(admin, backupHTD);
214      }
215      ensureTableEnabled(admin, tableName);
216      if (!admin.tableExists(bulkLoadTableName)) {
217        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
218        createSystemTable(admin, blHTD);
219      }
220      ensureTableEnabled(admin, bulkLoadTableName);
221      waitForSystemTable(admin, tableName);
222      waitForSystemTable(admin, bulkLoadTableName);
223    }
224  }
225
226  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
227    try {
228      admin.createTable(descriptor);
229    } catch (TableExistsException e) {
230      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
231      // so may be subject to race conditions where one caller succeeds in creating the
232      // table and others fail because it now exists
233      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
234    }
235  }
236
237  private void verifyNamespaceExists(Admin admin) throws IOException {
238    String namespaceName = tableName.getNamespaceAsString();
239    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
240    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
241    boolean exists = false;
242    for (NamespaceDescriptor nsd : list) {
243      if (nsd.getName().equals(ns.getName())) {
244        exists = true;
245        break;
246      }
247    }
248    if (!exists) {
249      try {
250        admin.createNamespace(ns);
251      } catch (NamespaceExistException e) {
252        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
253        // so may be subject to race conditions where one caller succeeds in creating the
254        // namespace and others fail because it now exists
255        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
256      }
257    }
258  }
259
260  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
261    // Return fast if the table is available and avoid a log message
262    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
263      return;
264    }
265    long TIMEOUT = 60000;
266    long startTime = EnvironmentEdgeManager.currentTime();
267    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
268      tableName);
269    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
270      try {
271        Thread.sleep(100);
272      } catch (InterruptedException e) {
273        throw (IOException) new InterruptedIOException().initCause(e);
274      }
275      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
276        throw new IOException(
277          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
278      }
279    }
280    LOG.debug("Backup table {} exists and available", tableName);
281  }
282
283  @Override
284  public void close() {
285    // do nothing
286  }
287
288  /**
289   * Updates status (state) of a backup session in backup system table table
290   * @param info backup info
291   * @throws IOException exception
292   */
293  public void updateBackupInfo(BackupInfo info) throws IOException {
294    if (LOG.isTraceEnabled()) {
295      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
296        + " set status=" + info.getState());
297    }
298    try (Table table = connection.getTable(tableName)) {
299      Put put = createPutForBackupInfo(info);
300      table.put(put);
301    }
302  }
303
304  /*
305   * @param backupId the backup Id
306   * @return Map of rows to path of bulk loaded hfile
307   */
308  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
309    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
310    try (Table table = connection.getTable(bulkLoadTableName);
311      ResultScanner scanner = table.getScanner(scan)) {
312      Result res = null;
313      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
314      while ((res = scanner.next()) != null) {
315        res.advance();
316        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
317        for (Cell cell : res.listCells()) {
318          if (
319            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
320              BackupSystemTable.PATH_COL.length) == 0
321          ) {
322            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
323          }
324        }
325      }
326      return map;
327    }
328  }
329
330  /**
331   * Deletes backup status from backup system table table
332   * @param backupId backup id
333   * @throws IOException exception
334   */
335  public void deleteBackupInfo(String backupId) throws IOException {
336    if (LOG.isTraceEnabled()) {
337      LOG.trace("delete backup status in backup system table for " + backupId);
338    }
339    try (Table table = connection.getTable(tableName)) {
340      Delete del = createDeleteForBackupInfo(backupId);
341      table.delete(del);
342    }
343  }
344
345  /**
346   * Registers a bulk load.
347   * @param tableName     table name
348   * @param region        the region receiving hfile
349   * @param cfToHfilePath column family and associated hfiles
350   */
351  public void registerBulkLoad(TableName tableName, byte[] region,
352    Map<byte[], List<Path>> cfToHfilePath) throws IOException {
353    if (LOG.isDebugEnabled()) {
354      LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
355        cfToHfilePath.size());
356    }
357    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
358      List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
359      bufferedMutator.mutate(puts);
360      LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName);
361    }
362  }
363
364  /**
365   * Removes entries from the table that tracks all bulk loaded hfiles.
366   * @param rows the row keys of the entries to be deleted
367   */
368  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
369    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
370      List<Delete> deletes = new ArrayList<>();
371      for (byte[] row : rows) {
372        Delete del = new Delete(row);
373        deletes.add(del);
374        LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row));
375      }
376      bufferedMutator.mutate(deletes);
377      LOG.debug("Deleted {} bulk load entries.", rows.size());
378    }
379  }
380
381  /**
382   * Reads all registered bulk loads.
383   */
384  public List<BulkLoad> readBulkloadRows() throws IOException {
385    Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null);
386    return processBulkLoadRowScan(scan);
387  }
388
389  /**
390   * Reads the registered bulk loads for the given tables.
391   */
392  public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException {
393    List<BulkLoad> result = new ArrayList<>();
394    for (TableName table : tableList) {
395      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
396      result.addAll(processBulkLoadRowScan(scan));
397    }
398    return result;
399  }
400
401  private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException {
402    List<BulkLoad> result = new ArrayList<>();
403    try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
404      ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
405      Result res;
406      while ((res = scanner.next()) != null) {
407        res.advance();
408        TableName table = null;
409        String fam = null;
410        String path = null;
411        String region = null;
412        byte[] row = null;
413        for (Cell cell : res.listCells()) {
414          row = CellUtil.cloneRow(cell);
415          String rowStr = Bytes.toString(row);
416          region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
417          if (
418            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
419              BackupSystemTable.TBL_COL.length) == 0
420          ) {
421            table = TableName.valueOf(CellUtil.cloneValue(cell));
422          } else if (
423            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
424              BackupSystemTable.FAM_COL.length) == 0
425          ) {
426            fam = Bytes.toString(CellUtil.cloneValue(cell));
427          } else if (
428            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
429              BackupSystemTable.PATH_COL.length) == 0
430          ) {
431            path = Bytes.toString(CellUtil.cloneValue(cell));
432          }
433        }
434        result.add(new BulkLoad(table, region, fam, path, row));
435        LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path);
436      }
437    }
438    return result;
439  }
440
441  /**
442   * Reads backup status object (instance of backup info) from backup system table table
443   * @param backupId backup id
444   * @return Current status of backup session or null
445   */
446  public BackupInfo readBackupInfo(String backupId) throws IOException {
447    if (LOG.isTraceEnabled()) {
448      LOG.trace("read backup status from backup system table for: " + backupId);
449    }
450
451    try (Table table = connection.getTable(tableName)) {
452      Get get = createGetForBackupInfo(backupId);
453      Result res = table.get(get);
454      if (res.isEmpty()) {
455        return null;
456      }
457      return resultToBackupInfo(res);
458    }
459  }
460
461  /**
462   * Exclusive operations are: create, delete, merge
463   * @throws IOException if a table operation fails or an active backup exclusive operation is
464   *                     already underway
465   */
466  public void startBackupExclusiveOperation() throws IOException {
467    LOG.debug("Start new backup exclusive operation");
468
469    try (Table table = connection.getTable(tableName)) {
470      Put put = createPutForStartBackupSession();
471      // First try to put if row does not exist
472      if (
473        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
474          .ifNotExists().thenPut(put)
475      ) {
476        // Row exists, try to put if value == ACTIVE_SESSION_NO
477        if (
478          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
479            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
480        ) {
481          throw new ExclusiveOperationException();
482        }
483      }
484    }
485  }
486
487  private Put createPutForStartBackupSession() {
488    Put put = new Put(ACTIVE_SESSION_ROW);
489    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
490    return put;
491  }
492
493  public void finishBackupExclusiveOperation() throws IOException {
494    LOG.debug("Finish backup exclusive operation");
495
496    try (Table table = connection.getTable(tableName)) {
497      Put put = createPutForStopBackupSession();
498      if (
499        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
500          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
501      ) {
502        throw new IOException("There is no active backup exclusive operation");
503      }
504    }
505  }
506
507  private Put createPutForStopBackupSession() {
508    Put put = new Put(ACTIVE_SESSION_ROW);
509    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
510    return put;
511  }
512
513  /**
514   * Get the Region Servers log information after the last log roll from backup system table.
515   * @param backupRoot root directory path to backup
516   * @return RS log info
517   * @throws IOException exception
518   */
519  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
520    throws IOException {
521    LOG.trace("read region server last roll log result to backup system table");
522
523    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
524
525    try (Table table = connection.getTable(tableName);
526      ResultScanner scanner = table.getScanner(scan)) {
527      Result res;
528      HashMap<String, Long> rsTimestampMap = new HashMap<>();
529      while ((res = scanner.next()) != null) {
530        res.advance();
531        Cell cell = res.current();
532        byte[] row = CellUtil.cloneRow(cell);
533        String server = getServerNameForReadRegionServerLastLogRollResult(row);
534        byte[] data = CellUtil.cloneValue(cell);
535        rsTimestampMap.put(server, Bytes.toLong(data));
536      }
537      return rsTimestampMap;
538    }
539  }
540
541  /**
542   * Writes Region Server last roll log result (timestamp) to backup system table table
543   * @param server     Region Server name
544   * @param ts         last log timestamp
545   * @param backupRoot root directory path to backup
546   * @throws IOException exception
547   */
548  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
549    throws IOException {
550    LOG.trace("write region server last roll log result to backup system table");
551
552    try (Table table = connection.getTable(tableName)) {
553      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
554      table.put(put);
555    }
556  }
557
558  /**
559   * Retrieve all table names that are part of any known completed backup
560   */
561  public Set<TableName> getTablesIncludedInBackups() throws IOException {
562    // Incremental backups have the same tables as the preceding full backups
563    List<BackupInfo> infos =
564      getBackupHistory(withState(BackupState.COMPLETE), withType(BackupType.FULL));
565    return infos.stream().flatMap(info -> info.getTableNames().stream())
566      .collect(Collectors.toSet());
567  }
568
569  /**
570   * Goes through all backup history corresponding to the provided root folder, and collects all
571   * backup info mentioning each of the provided tables.
572   * @param set        the tables for which to collect the {@code BackupInfo}
573   * @param backupRoot backup destination path to retrieve backup history for
574   * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at
575   *         least one {@code BackupInfo}
576   * @throws IOException if getting the backup history fails
577   */
578  public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
579    String backupRoot) throws IOException {
580    List<BackupInfo> history = getBackupHistory(withRoot(backupRoot));
581    Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>();
582    for (BackupInfo info : history) {
583      List<TableName> tables = info.getTableNames();
584      for (TableName tableName : tables) {
585        if (set.contains(tableName)) {
586          List<BackupInfo> list =
587            tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>());
588          list.add(info);
589        }
590      }
591    }
592    return tableHistoryMap;
593  }
594
595  /**
596   * Get all backup information passing the given filters, ordered by descending backupId. I.e. from
597   * newest to oldest.
598   */
599  public List<BackupInfo> getBackupHistory(BackupInfo.Filter... toInclude) throws IOException {
600    return getBackupHistory(Order.NEW_TO_OLD, Integer.MAX_VALUE, toInclude);
601  }
602
603  /**
604   * Retrieves the first n entries of the sorted, filtered list of backup infos.
605   * @param order desired ordering of the results.
606   * @param n     number of entries to return
607   */
608  public List<BackupInfo> getBackupHistory(Order order, int n, BackupInfo.Filter... toInclude)
609    throws IOException {
610    Preconditions.checkArgument(n >= 0, "n should be >= 0");
611    LOG.trace("get backup infos from backup system table");
612
613    if (n == 0) {
614      return Collections.emptyList();
615    }
616
617    Predicate<BackupInfo> combinedPredicate = Stream.of(toInclude)
618      .map(filter -> (Predicate<BackupInfo>) filter).reduce(Predicate::and).orElse(x -> true);
619
620    Scan scan = createScanForBackupHistory(order);
621    List<BackupInfo> list = new ArrayList<>();
622
623    try (Table table = connection.getTable(tableName);
624      ResultScanner scanner = table.getScanner(scan)) {
625      Result res;
626      while ((res = scanner.next()) != null) {
627        res.advance();
628        BackupInfo context = cellToBackupInfo(res.current());
629        if (combinedPredicate.test(context)) {
630          list.add(context);
631          if (list.size() == n) {
632            break;
633          }
634        }
635      }
636      return list;
637    }
638  }
639
640  /**
641   * Write the current timestamps for each regionserver to backup system table after a successful
642   * full or incremental backup. The saved timestamp is of the last log file that was backed up
643   * already.
644   * @param tables        tables
645   * @param newTimestamps timestamps
646   * @param backupRoot    root directory path to backup
647   * @throws IOException exception
648   */
649  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
650    String backupRoot) throws IOException {
651    if (LOG.isTraceEnabled()) {
652      LOG.trace("write RS log time stamps to backup system table for tables ["
653        + StringUtils.join(tables, ",") + "]");
654    }
655    List<Put> puts = new ArrayList<>();
656    for (TableName table : tables) {
657      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
658      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
659      puts.add(put);
660    }
661    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) {
662      bufferedMutator.mutate(puts);
663    }
664  }
665
666  /**
667   * Read the timestamp for each region server log after the last successful backup. Each table has
668   * its own set of the timestamps. The info is stored for each table as a concatenated string of
669   * rs->timestapmp
670   * @param backupRoot root directory path to backup
671   * @return the timestamp for each region server. key: tableName value:
672   *         RegionServer,PreviousTimeStamp
673   * @throws IOException exception
674   */
675  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
676    throws IOException {
677    if (LOG.isTraceEnabled()) {
678      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
679    }
680
681    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
682
683    Scan scan = createScanForReadLogTimestampMap(backupRoot);
684    try (Table table = connection.getTable(tableName);
685      ResultScanner scanner = table.getScanner(scan)) {
686      Result res;
687      while ((res = scanner.next()) != null) {
688        res.advance();
689        Cell cell = res.current();
690        byte[] row = CellUtil.cloneRow(cell);
691        String tabName = getTableNameForReadLogTimestampMap(row);
692        TableName tn = TableName.valueOf(tabName);
693        byte[] data = CellUtil.cloneValue(cell);
694        if (data == null) {
695          throw new IOException("Data of last backup data from backup system table "
696            + "is empty. Create a backup first.");
697        }
698        if (data != null && data.length > 0) {
699          HashMap<String, Long> lastBackup =
700            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
701          tableTimestampMap.put(tn, lastBackup);
702        }
703      }
704      return tableTimestampMap;
705    }
706  }
707
708  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
709    Map<String, Long> map) {
710    BackupProtos.TableServerTimestamp.Builder tstBuilder =
711      BackupProtos.TableServerTimestamp.newBuilder();
712    tstBuilder
713      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
714
715    for (Entry<String, Long> entry : map.entrySet()) {
716      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
717      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
718      ServerName sn = ServerName.parseServerName(entry.getKey());
719      snBuilder.setHostName(sn.getHostname());
720      snBuilder.setPort(sn.getPort());
721      builder.setServerName(snBuilder.build());
722      builder.setTimestamp(entry.getValue());
723      tstBuilder.addServerTimestamp(builder.build());
724    }
725
726    return tstBuilder.build();
727  }
728
729  private HashMap<String, Long>
730    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
731
732    HashMap<String, Long> map = new HashMap<>();
733    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
734    for (BackupProtos.ServerTimestamp st : list) {
735      ServerName sn =
736        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
737      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
738    }
739    return map;
740  }
741
742  /**
743   * Return the current tables covered by incremental backup.
744   * @param backupRoot root directory path to backup
745   * @return set of tableNames
746   * @throws IOException exception
747   */
748  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
749    LOG.trace("get incremental backup table set from backup system table");
750
751    TreeSet<TableName> set = new TreeSet<>();
752
753    try (Table table = connection.getTable(tableName)) {
754      Get get = createGetForIncrBackupTableSet(backupRoot);
755      Result res = table.get(get);
756      if (res.isEmpty()) {
757        return set;
758      }
759      List<Cell> cells = res.listCells();
760      for (Cell cell : cells) {
761        // qualifier = table name - we use table names as qualifiers
762        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
763      }
764      return set;
765    }
766  }
767
768  /**
769   * Add tables to global incremental backup set
770   * @param tables     set of tables
771   * @param backupRoot root directory path to backup
772   * @throws IOException exception
773   */
774  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
775    throws IOException {
776    if (LOG.isTraceEnabled()) {
777      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
778        + " tables [" + StringUtils.join(tables, " ") + "]");
779    }
780    if (LOG.isDebugEnabled()) {
781      tables.forEach(table -> LOG.debug(Objects.toString(table)));
782    }
783    try (Table table = connection.getTable(tableName)) {
784      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
785      table.put(put);
786    }
787  }
788
789  /**
790   * Deletes incremental backup set for a backup destination
791   * @param backupRoot backup root
792   */
793  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
794    if (LOG.isTraceEnabled()) {
795      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
796    }
797    try (Table table = connection.getTable(tableName)) {
798      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
799      table.delete(delete);
800    }
801  }
802
803  /**
804   * Checks if we have at least one backup session in backup system table This API is used by
805   * BackupLogCleaner
806   * @return true, if at least one session exists in backup system table
807   * @throws IOException exception
808   */
809  public boolean hasBackupSessions() throws IOException {
810    LOG.trace("Has backup sessions from backup system table");
811
812    Scan scan = createScanForBackupHistory(Order.OLD_TO_NEW);
813    scan.setCaching(1);
814    try (Table table = connection.getTable(tableName);
815      ResultScanner scanner = table.getScanner(scan)) {
816      return scanner.next() != null;
817    }
818  }
819
820  /**
821   * BACKUP SETS
822   */
823
824  /**
825   * Get backup set list
826   * @return backup set list
827   * @throws IOException if a table or scanner operation fails
828   */
829  public List<String> listBackupSets() throws IOException {
830    LOG.trace("Backup set list");
831
832    List<String> list = new ArrayList<>();
833    try (Table table = connection.getTable(tableName)) {
834      Scan scan = createScanForBackupSetList();
835      scan.readVersions(1);
836      try (ResultScanner scanner = table.getScanner(scan)) {
837        Result res;
838        while ((res = scanner.next()) != null) {
839          res.advance();
840          list.add(cellKeyToBackupSetName(res.current()));
841        }
842        return list;
843      }
844    }
845  }
846
847  /**
848   * Get backup set description (list of tables)
849   * @param name set's name
850   * @return list of tables in a backup set
851   * @throws IOException if a table operation fails
852   */
853  public List<TableName> describeBackupSet(String name) throws IOException {
854    if (LOG.isTraceEnabled()) {
855      LOG.trace(" Backup set describe: " + name);
856    }
857    try (Table table = connection.getTable(tableName)) {
858      Get get = createGetForBackupSet(name);
859      Result res = table.get(get);
860      if (res.isEmpty()) {
861        return null;
862      }
863      res.advance();
864      String[] tables = cellValueToBackupSet(res.current());
865      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
866        .collect(Collectors.toList());
867    }
868  }
869
870  /**
871   * Add backup set (list of tables)
872   * @param name      set name
873   * @param newTables list of tables, comma-separated
874   * @throws IOException if a table operation fails
875   */
876  public void addToBackupSet(String name, String[] newTables) throws IOException {
877    if (LOG.isTraceEnabled()) {
878      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
879    }
880    String[] union = null;
881    try (Table table = connection.getTable(tableName)) {
882      Get get = createGetForBackupSet(name);
883      Result res = table.get(get);
884      if (res.isEmpty()) {
885        union = newTables;
886      } else {
887        res.advance();
888        String[] tables = cellValueToBackupSet(res.current());
889        union = merge(tables, newTables);
890      }
891      Put put = createPutForBackupSet(name, union);
892      table.put(put);
893    }
894  }
895
896  /**
897   * Remove tables from backup set (list of tables)
898   * @param name     set name
899   * @param toRemove list of tables
900   * @throws IOException if a table operation or deleting the backup set fails
901   */
902  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
903    if (LOG.isTraceEnabled()) {
904      LOG.trace(
905        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
906    }
907    String[] disjoint;
908    String[] tables;
909    try (Table table = connection.getTable(tableName)) {
910      Get get = createGetForBackupSet(name);
911      Result res = table.get(get);
912      if (res.isEmpty()) {
913        LOG.warn("Backup set '" + name + "' not found.");
914        return;
915      } else {
916        res.advance();
917        tables = cellValueToBackupSet(res.current());
918        disjoint = disjoin(tables, toRemove);
919      }
920      if (disjoint.length > 0 && disjoint.length != tables.length) {
921        Put put = createPutForBackupSet(name, disjoint);
922        table.put(put);
923      } else if (disjoint.length == tables.length) {
924        LOG.warn("Backup set '" + name + "' does not contain tables ["
925          + StringUtils.join(toRemove, " ") + "]");
926      } else { // disjoint.length == 0 and tables.length >0
927        // Delete backup set
928        LOG.info("Backup set '" + name + "' is empty. Deleting.");
929        deleteBackupSet(name);
930      }
931    }
932  }
933
934  private String[] merge(String[] existingTables, String[] newTables) {
935    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
936    tables.addAll(Arrays.asList(newTables));
937    return tables.toArray(new String[0]);
938  }
939
940  private String[] disjoin(String[] existingTables, String[] toRemove) {
941    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
942    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
943    return tables.toArray(new String[0]);
944  }
945
946  /**
947   * Delete backup set
948   * @param name set's name
949   * @throws IOException if getting or deleting the table fails
950   */
951  public void deleteBackupSet(String name) throws IOException {
952    if (LOG.isTraceEnabled()) {
953      LOG.trace(" Backup set delete: " + name);
954    }
955    try (Table table = connection.getTable(tableName)) {
956      Delete del = createDeleteForBackupSet(name);
957      table.delete(del);
958    }
959  }
960
961  /**
962   * Get backup system table descriptor
963   * @return table's descriptor
964   */
965  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
966    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
967
968    ColumnFamilyDescriptorBuilder colBuilder =
969      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
970
971    colBuilder.setMaxVersions(1);
972    Configuration config = HBaseConfiguration.create();
973    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
974      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
975    colBuilder.setTimeToLive(ttl);
976
977    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
978    builder.setColumnFamily(colSessionsDesc);
979
980    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
981    colBuilder.setTimeToLive(ttl);
982    builder.setColumnFamily(colBuilder.build());
983    return builder.build();
984  }
985
986  public static TableName getTableName(Configuration conf) {
987    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
988      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
989    return TableName.valueOf(name);
990  }
991
992  public static String getTableNameAsString(Configuration conf) {
993    return getTableName(conf).getNameAsString();
994  }
995
996  public static String getSnapshotName(Configuration conf) {
997    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
998  }
999
1000  /**
1001   * Get backup system table descriptor
1002   * @return table's descriptor
1003   */
1004  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1005    TableDescriptorBuilder builder =
1006      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1007
1008    ColumnFamilyDescriptorBuilder colBuilder =
1009      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1010    colBuilder.setMaxVersions(1);
1011    Configuration config = HBaseConfiguration.create();
1012    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1013      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1014    colBuilder.setTimeToLive(ttl);
1015    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1016    builder.setColumnFamily(colSessionsDesc);
1017    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1018    colBuilder.setTimeToLive(ttl);
1019    builder.setColumnFamily(colBuilder.build());
1020    return builder.build();
1021  }
1022
1023  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1024    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1025      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1026    return TableName.valueOf(name);
1027  }
1028
1029  /**
1030   * Creates Put operation for a given backup info object
1031   * @param context backup info
1032   * @return put operation
1033   * @throws IOException exception
1034   */
1035  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1036    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1037    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1038      context.toByteArray());
1039    return put;
1040  }
1041
1042  /**
1043   * Creates Get operation for a given backup id
1044   * @param backupId backup's ID
1045   * @return get operation
1046   * @throws IOException exception
1047   */
1048  private Get createGetForBackupInfo(String backupId) throws IOException {
1049    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1050    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1051    get.readVersions(1);
1052    return get;
1053  }
1054
1055  /**
1056   * Creates Delete operation for a given backup id
1057   * @param backupId backup's ID
1058   * @return delete operation
1059   */
1060  private Delete createDeleteForBackupInfo(String backupId) {
1061    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1062    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1063    return del;
1064  }
1065
1066  /**
1067   * Converts Result to BackupInfo
1068   * @param res HBase result
1069   * @return backup info instance
1070   * @throws IOException exception
1071   */
1072  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1073    res.advance();
1074    Cell cell = res.current();
1075    return cellToBackupInfo(cell);
1076  }
1077
1078  /**
1079   * Creates Get to retrieve incremental backup table set from backup system table
1080   * @return get operation
1081   * @throws IOException exception
1082   */
1083  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1084    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1085    get.addFamily(BackupSystemTable.META_FAMILY);
1086    get.readVersions(1);
1087    return get;
1088  }
1089
1090  /**
1091   * Creates Put to store incremental backup table set
1092   * @param tables tables
1093   * @return put operation
1094   */
1095  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1096    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1097    for (TableName table : tables) {
1098      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1099        EMPTY_VALUE);
1100    }
1101    return put;
1102  }
1103
1104  /**
1105   * Creates Delete for incremental backup table set
1106   * @param backupRoot backup root
1107   * @return delete operation
1108   */
1109  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1110    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1111    delete.addFamily(BackupSystemTable.META_FAMILY);
1112    return delete;
1113  }
1114
1115  /**
1116   * Creates Scan operation to load backup history
1117   * @param order order of the scan results
1118   * @return scan operation
1119   */
1120  private Scan createScanForBackupHistory(Order order) {
1121    Scan scan = new Scan();
1122    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1123    if (order == Order.NEW_TO_OLD) {
1124      byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1125      stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1126      scan.setReversed(true);
1127      scan.withStartRow(stopRow, false);
1128      scan.withStopRow(startRow);
1129    } else if (order == Order.OLD_TO_NEW) {
1130      scan.setStartStopRowForPrefixScan(startRow);
1131    } else {
1132      throw new IllegalArgumentException("Unsupported order: " + order);
1133    }
1134    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1135    scan.readVersions(1);
1136    return scan;
1137  }
1138
1139  /**
1140   * Converts cell to backup info instance.
1141   * @param current current cell
1142   * @return backup backup info instance
1143   * @throws IOException exception
1144   */
1145  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1146    byte[] data = CellUtil.cloneValue(current);
1147    return BackupInfo.fromByteArray(data);
1148  }
1149
1150  /**
1151   * Creates Put to write RS last roll log timestamp map
1152   * @param table table
1153   * @param smap  map, containing RS:ts
1154   * @return put operation
1155   */
1156  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1157    String backupRoot) {
1158    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1159    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1160    return put;
1161  }
1162
1163  /**
1164   * Creates Scan to load table-> { RS -> ts} map of maps
1165   * @return scan operation
1166   */
1167  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1168    Scan scan = new Scan();
1169    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1170    scan.addFamily(BackupSystemTable.META_FAMILY);
1171
1172    return scan;
1173  }
1174
1175  /**
1176   * Get table name from rowkey
1177   * @param cloneRow rowkey
1178   * @return table name
1179   */
1180  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1181    String s = Bytes.toString(cloneRow);
1182    int index = s.lastIndexOf(NULL);
1183    return s.substring(index + 1);
1184  }
1185
1186  /**
1187   * Creates Put to store RS last log result
1188   * @param server    server name
1189   * @param timestamp log roll result (timestamp)
1190   * @return put operation
1191   */
1192  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1193    String backupRoot) {
1194    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1195    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1196      Bytes.toBytes(timestamp));
1197    return put;
1198  }
1199
1200  /**
1201   * Creates Scan operation to load last RS log roll results
1202   * @return scan operation
1203   */
1204  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1205    Scan scan = new Scan();
1206    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1207    scan.addFamily(BackupSystemTable.META_FAMILY);
1208    scan.readVersions(1);
1209
1210    return scan;
1211  }
1212
1213  /**
1214   * Get server's name from rowkey
1215   * @param row rowkey
1216   * @return server's name
1217   */
1218  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1219    String s = Bytes.toString(row);
1220    int index = s.lastIndexOf(NULL);
1221    return s.substring(index + 1);
1222  }
1223
1224  /**
1225   * Creates Put's for bulk loads.
1226   */
1227  private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
1228    Map<byte[], List<Path>> columnFamilyToHFilePaths) {
1229    List<Put> puts = new ArrayList<>();
1230    for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
1231      for (Path path : entry.getValue()) {
1232        String file = path.toString();
1233        int lastSlash = file.lastIndexOf("/");
1234        String filename = file.substring(lastSlash + 1);
1235        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1236          Bytes.toString(region), BLK_LD_DELIM, filename));
1237        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1238        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1239        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1240        puts.add(put);
1241        LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
1242      }
1243    }
1244    return puts;
1245  }
1246
1247  public static void snapshot(Connection conn) throws IOException {
1248    try (Admin admin = conn.getAdmin()) {
1249      Configuration conf = conn.getConfiguration();
1250      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1251    }
1252  }
1253
1254  public static void restoreFromSnapshot(Connection conn) throws IOException {
1255    Configuration conf = conn.getConfiguration();
1256    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1257    try (Admin admin = conn.getAdmin()) {
1258      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1259      if (snapshotExists(admin, snapshotName)) {
1260        admin.restoreBackupSystemTable(snapshotName);
1261        LOG.debug("Done restoring backup system table");
1262      } else {
1263        // Snapshot does not exists, i.e completeBackup failed after
1264        // deleting backup system table snapshot
1265        // In this case we log WARN and proceed
1266        LOG.warn(
1267          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1268      }
1269    }
1270  }
1271
1272  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1273    List<SnapshotDescription> list = admin.listSnapshots();
1274    for (SnapshotDescription desc : list) {
1275      if (desc.getName().equals(snapshotName)) {
1276        return true;
1277      }
1278    }
1279    return false;
1280  }
1281
1282  public static boolean snapshotExists(Connection conn) throws IOException {
1283    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1284  }
1285
1286  public static void deleteSnapshot(Connection conn) throws IOException {
1287    Configuration conf = conn.getConfiguration();
1288    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1289    try (Admin admin = conn.getAdmin()) {
1290      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1291      if (snapshotExists(admin, snapshotName)) {
1292        admin.deleteSnapshot(snapshotName);
1293        LOG.debug("Done deleting backup system table snapshot");
1294      } else {
1295        LOG.error("Snapshot " + snapshotName + " does not exists");
1296      }
1297    }
1298  }
1299
1300  private Put createPutForDeleteOperation(String[] backupIdList) {
1301    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1302    Put put = new Put(DELETE_OP_ROW);
1303    put.addColumn(META_FAMILY, FAM_COL, value);
1304    return put;
1305  }
1306
1307  private Delete createDeleteForBackupDeleteOperation() {
1308    Delete delete = new Delete(DELETE_OP_ROW);
1309    delete.addFamily(META_FAMILY);
1310    return delete;
1311  }
1312
1313  private Get createGetForDeleteOperation() {
1314    Get get = new Get(DELETE_OP_ROW);
1315    get.addFamily(META_FAMILY);
1316    return get;
1317  }
1318
1319  public void startDeleteOperation(String[] backupIdList) throws IOException {
1320    if (LOG.isTraceEnabled()) {
1321      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1322    }
1323    Put put = createPutForDeleteOperation(backupIdList);
1324    try (Table table = connection.getTable(tableName)) {
1325      table.put(put);
1326    }
1327  }
1328
1329  public void finishDeleteOperation() throws IOException {
1330    LOG.trace("Finsih delete operation for backup ids");
1331
1332    Delete delete = createDeleteForBackupDeleteOperation();
1333    try (Table table = connection.getTable(tableName)) {
1334      table.delete(delete);
1335    }
1336  }
1337
1338  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1339    LOG.trace("Get delete operation for backup ids");
1340
1341    Get get = createGetForDeleteOperation();
1342    try (Table table = connection.getTable(tableName)) {
1343      Result res = table.get(get);
1344      if (res.isEmpty()) {
1345        return null;
1346      }
1347      Cell cell = res.listCells().get(0);
1348      byte[] val = CellUtil.cloneValue(cell);
1349      if (val.length == 0) {
1350        return null;
1351      }
1352      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1353        .toArray(String[]::new);
1354    }
1355  }
1356
1357  private Put createPutForMergeOperation(String[] backupIdList) {
1358    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1359    Put put = new Put(MERGE_OP_ROW);
1360    put.addColumn(META_FAMILY, FAM_COL, value);
1361    return put;
1362  }
1363
1364  public boolean isMergeInProgress() throws IOException {
1365    Get get = new Get(MERGE_OP_ROW);
1366    try (Table table = connection.getTable(tableName)) {
1367      Result res = table.get(get);
1368      return !res.isEmpty();
1369    }
1370  }
1371
1372  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1373    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1374    Put put = new Put(MERGE_OP_ROW);
1375    put.addColumn(META_FAMILY, PATH_COL, value);
1376    return put;
1377  }
1378
1379  private Delete createDeleteForBackupMergeOperation() {
1380    Delete delete = new Delete(MERGE_OP_ROW);
1381    delete.addFamily(META_FAMILY);
1382    return delete;
1383  }
1384
1385  private Get createGetForMergeOperation() {
1386    Get get = new Get(MERGE_OP_ROW);
1387    get.addFamily(META_FAMILY);
1388    return get;
1389  }
1390
1391  public void startMergeOperation(String[] backupIdList) throws IOException {
1392    if (LOG.isTraceEnabled()) {
1393      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1394    }
1395    Put put = createPutForMergeOperation(backupIdList);
1396    try (Table table = connection.getTable(tableName)) {
1397      table.put(put);
1398    }
1399  }
1400
1401  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1402    if (LOG.isTraceEnabled()) {
1403      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1404    }
1405    Put put = createPutForUpdateTablesForMerge(tables);
1406    try (Table table = connection.getTable(tableName)) {
1407      table.put(put);
1408    }
1409  }
1410
1411  public void finishMergeOperation() throws IOException {
1412    LOG.trace("Finish merge operation for backup ids");
1413
1414    Delete delete = createDeleteForBackupMergeOperation();
1415    try (Table table = connection.getTable(tableName)) {
1416      table.delete(delete);
1417    }
1418  }
1419
1420  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1421    LOG.trace("Get backup ids for merge operation");
1422
1423    Get get = createGetForMergeOperation();
1424    try (Table table = connection.getTable(tableName)) {
1425      Result res = table.get(get);
1426      if (res.isEmpty()) {
1427        return null;
1428      }
1429      Cell cell = res.listCells().get(0);
1430      byte[] val = CellUtil.cloneValue(cell);
1431      if (val.length == 0) {
1432        return null;
1433      }
1434      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1435        .toArray(String[]::new);
1436    }
1437  }
1438
1439  /**
1440   * Creates a scan to read all registered bulk loads for the given table, or for all tables if
1441   * {@code table} is {@code null}.
1442   */
1443  static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) {
1444    Scan scan = new Scan();
1445    byte[] startRow = table == null
1446      ? BULK_LOAD_PREFIX_BYTES
1447      : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1448    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1449    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1450    scan.withStartRow(startRow);
1451    scan.withStopRow(stopRow);
1452    scan.addFamily(BackupSystemTable.META_FAMILY);
1453    scan.readVersions(1);
1454    return scan;
1455  }
1456
1457  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1458    // format is bulk : namespace : table : region : file
1459    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1460  }
1461
1462  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1463    // format is bulk : namespace : table : region : file
1464    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1465    Iterator<String> i = parts.iterator();
1466    int idx = 3;
1467    if (parts.size() == 4) {
1468      // the table is in default namespace
1469      idx = 2;
1470    }
1471    String region = Iterators.get(i, idx);
1472    LOG.debug("bulk row string " + rowStr + " region " + region);
1473    return region;
1474  }
1475
1476  /*
1477   * Used to query bulk loaded hfiles which have been copied by incremental backup
1478   * @param backupId the backup Id. It can be null when querying for all tables
1479   * @return the Scan object
1480   * @deprecated This method is broken if a backupId is specified - see HBASE-28715
1481   */
1482  static Scan createScanForBulkLoadedFiles(String backupId) {
1483    Scan scan = new Scan();
1484    byte[] startRow =
1485      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1486    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1487    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1488    scan.withStartRow(startRow);
1489    scan.withStopRow(stopRow);
1490    scan.addFamily(BackupSystemTable.META_FAMILY);
1491    scan.readVersions(1);
1492    return scan;
1493  }
1494
1495  /**
1496   * Creates Scan operation to load backup set list
1497   * @return scan operation
1498   */
1499  private Scan createScanForBackupSetList() {
1500    Scan scan = new Scan();
1501    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1502    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1503    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1504    scan.withStartRow(startRow);
1505    scan.withStopRow(stopRow);
1506    scan.addFamily(BackupSystemTable.META_FAMILY);
1507    return scan;
1508  }
1509
1510  /**
1511   * Creates Get operation to load backup set content
1512   * @return get operation
1513   */
1514  private Get createGetForBackupSet(String name) {
1515    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1516    get.addFamily(BackupSystemTable.META_FAMILY);
1517    return get;
1518  }
1519
1520  /**
1521   * Creates Delete operation to delete backup set content
1522   * @param name backup set's name
1523   * @return delete operation
1524   */
1525  private Delete createDeleteForBackupSet(String name) {
1526    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1527    del.addFamily(BackupSystemTable.META_FAMILY);
1528    return del;
1529  }
1530
1531  /**
1532   * Creates Put operation to update backup set content
1533   * @param name   backup set's name
1534   * @param tables list of tables
1535   * @return put operation
1536   */
1537  private Put createPutForBackupSet(String name, String[] tables) {
1538    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1539    byte[] value = convertToByteArray(tables);
1540    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1541    return put;
1542  }
1543
1544  private byte[] convertToByteArray(String[] tables) {
1545    return Bytes.toBytes(StringUtils.join(tables, ","));
1546  }
1547
1548  /**
1549   * Converts cell to backup set list.
1550   * @param current current cell
1551   * @return backup set as array of table names
1552   */
1553  private String[] cellValueToBackupSet(Cell current) {
1554    byte[] data = CellUtil.cloneValue(current);
1555    if (!ArrayUtils.isEmpty(data)) {
1556      return Bytes.toString(data).split(",");
1557    }
1558    return new String[0];
1559  }
1560
1561  /**
1562   * Converts cell key to backup set name.
1563   * @param current current cell
1564   * @return backup set name
1565   */
1566  private String cellKeyToBackupSetName(Cell current) {
1567    byte[] data = CellUtil.cloneRow(current);
1568    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1569  }
1570
1571  private static byte[] rowkey(String s, String... other) {
1572    StringBuilder sb = new StringBuilder(s);
1573    for (String ss : other) {
1574      sb.append(ss);
1575    }
1576    return Bytes.toBytes(sb.toString());
1577  }
1578
1579  private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException {
1580    if (!admin.isTableEnabled(tableName)) {
1581      try {
1582        admin.enableTable(tableName);
1583      } catch (TableNotDisabledException ignored) {
1584        LOG.info("Table {} is not disabled, ignoring enable request", tableName);
1585      }
1586    }
1587  }
1588
1589  public enum Order {
1590    /**
1591     * Old backups first, most recents last. I.e. sorted by ascending backupId.
1592     */
1593    OLD_TO_NEW,
1594    /**
1595     * New backups first, oldest last. I.e. sorted by descending backupId.
1596     */
1597    NEW_TO_OLD
1598  }
1599}