001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot;
021import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
022import static org.apache.hadoop.hbase.backup.BackupInfo.withType;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.Closeable;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.nio.charset.StandardCharsets;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Comparator;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.Map.Entry;
040import java.util.Objects;
041import java.util.Set;
042import java.util.TreeMap;
043import java.util.TreeSet;
044import java.util.function.Predicate;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import org.apache.commons.lang3.ArrayUtils;
048import org.apache.commons.lang3.StringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.Path;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.NamespaceDescriptor;
055import org.apache.hadoop.hbase.NamespaceExistException;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableExistsException;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.TableNotDisabledException;
060import org.apache.hadoop.hbase.backup.BackupInfo;
061import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
062import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
063import org.apache.hadoop.hbase.backup.BackupType;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.BufferedMutator;
066import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
068import org.apache.hadoop.hbase.client.Connection;
069import org.apache.hadoop.hbase.client.Delete;
070import org.apache.hadoop.hbase.client.Get;
071import org.apache.hadoop.hbase.client.Put;
072import org.apache.hadoop.hbase.client.Result;
073import org.apache.hadoop.hbase.client.ResultScanner;
074import org.apache.hadoop.hbase.client.Scan;
075import org.apache.hadoop.hbase.client.SnapshotDescription;
076import org.apache.hadoop.hbase.client.Table;
077import org.apache.hadoop.hbase.client.TableDescriptor;
078import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
081import org.apache.yetus.audience.InterfaceAudience;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
086import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
087
088import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
090
091/**
092 * This class provides API to access backup system table<br>
093 * Backup system table schema:<br>
094 * <p>
095 * <ul>
096 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
097 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
098 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of
099 * include table; value=empty</li>
100 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
101 * timestamp]</li>
102 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
103 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
104 * name</li>
105 * </ul>
106 * </p>
107 */
108@InterfaceAudience.Private
109public final class BackupSystemTable implements Closeable {
110
111  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
112
113  static class WALItem {
114    String backupId;
115    String walFile;
116    String backupRoot;
117
118    WALItem(String backupId, String walFile, String backupRoot) {
119      this.backupId = backupId;
120      this.walFile = walFile;
121      this.backupRoot = backupRoot;
122    }
123
124    public String getBackupId() {
125      return backupId;
126    }
127
128    public String getWalFile() {
129      return walFile;
130    }
131
132    public String getBackupRoot() {
133      return backupRoot;
134    }
135
136    @Override
137    public String toString() {
138      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
139    }
140  }
141
142  /**
143   * Backup system table (main) name
144   */
145  private TableName tableName;
146
147  /**
148   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
149   * separate table because we have to isolate general backup operations: create, merge etc from
150   * activity of RegionObserver, which controls process of a bulk loading
151   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
152   */
153  private TableName bulkLoadTableName;
154
155  /**
156   * Stores backup sessions (contexts)
157   */
158  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
159  /**
160   * Stores other meta
161   */
162  final static byte[] META_FAMILY = Bytes.toBytes("meta");
163  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
164  /**
165   * Connection to HBase cluster, shared among all instances
166   */
167  private final Connection connection;
168
169  private final static String BACKUP_INFO_PREFIX = "session:";
170  private final static String START_CODE_ROW = "startcode:";
171  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
172  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
173
174  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
175  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
176
177  private final static String INCR_BACKUP_SET = "incrbackupset:";
178  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
179  private final static String RS_LOG_TS_PREFIX = "rslogts:";
180
181  private final static String BULK_LOAD_PREFIX = "bulk:";
182  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
183  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
184  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
185
186  final static byte[] TBL_COL = Bytes.toBytes("tbl");
187  final static byte[] FAM_COL = Bytes.toBytes("fam");
188  final static byte[] PATH_COL = Bytes.toBytes("path");
189
190  private final static String SET_KEY_PREFIX = "backupset:";
191
192  // separator between BULK_LOAD_PREFIX and ordinals
193  private final static String BLK_LD_DELIM = ":";
194  private final static byte[] EMPTY_VALUE = new byte[] {};
195
196  // Safe delimiter in a string
197  private final static String NULL = "\u0000";
198
199  public BackupSystemTable(Connection conn) throws IOException {
200    this.connection = conn;
201    Configuration conf = this.connection.getConfiguration();
202    tableName = BackupSystemTable.getTableName(conf);
203    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
204    checkSystemTable();
205  }
206
207  private void checkSystemTable() throws IOException {
208    try (Admin admin = connection.getAdmin()) {
209      verifyNamespaceExists(admin);
210      Configuration conf = connection.getConfiguration();
211      if (!admin.tableExists(tableName)) {
212        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
213        createSystemTable(admin, backupHTD);
214      }
215      ensureTableEnabled(admin, tableName);
216      if (!admin.tableExists(bulkLoadTableName)) {
217        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
218        createSystemTable(admin, blHTD);
219      }
220      ensureTableEnabled(admin, bulkLoadTableName);
221      waitForSystemTable(admin, tableName);
222      waitForSystemTable(admin, bulkLoadTableName);
223    }
224  }
225
226  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
227    try {
228      admin.createTable(descriptor);
229    } catch (TableExistsException e) {
230      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
231      // so may be subject to race conditions where one caller succeeds in creating the
232      // table and others fail because it now exists
233      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
234    }
235  }
236
237  private void verifyNamespaceExists(Admin admin) throws IOException {
238    String namespaceName = tableName.getNamespaceAsString();
239    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
240    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
241    boolean exists = false;
242    for (NamespaceDescriptor nsd : list) {
243      if (nsd.getName().equals(ns.getName())) {
244        exists = true;
245        break;
246      }
247    }
248    if (!exists) {
249      try {
250        admin.createNamespace(ns);
251      } catch (NamespaceExistException e) {
252        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
253        // so may be subject to race conditions where one caller succeeds in creating the
254        // namespace and others fail because it now exists
255        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
256      }
257    }
258  }
259
260  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
261    // Return fast if the table is available and avoid a log message
262    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
263      return;
264    }
265    long TIMEOUT = 60000;
266    long startTime = EnvironmentEdgeManager.currentTime();
267    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
268      tableName);
269    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
270      try {
271        Thread.sleep(100);
272      } catch (InterruptedException e) {
273        throw (IOException) new InterruptedIOException().initCause(e);
274      }
275      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
276        throw new IOException(
277          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
278      }
279    }
280    LOG.debug("Backup table {} exists and available", tableName);
281  }
282
283  @Override
284  public void close() {
285    // do nothing
286  }
287
288  /**
289   * Updates status (state) of a backup session in backup system table table
290   * @param info backup info
291   * @throws IOException exception
292   */
293  public void updateBackupInfo(BackupInfo info) throws IOException {
294    if (LOG.isTraceEnabled()) {
295      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
296        + " set status=" + info.getState());
297    }
298    try (Table table = connection.getTable(tableName)) {
299      Put put = createPutForBackupInfo(info);
300      table.put(put);
301    }
302  }
303
304  /*
305   * @param backupId the backup Id
306   * @return Map of rows to path of bulk loaded hfile
307   */
308  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
309    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
310    try (Table table = connection.getTable(bulkLoadTableName);
311      ResultScanner scanner = table.getScanner(scan)) {
312      Result res = null;
313      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
314      while ((res = scanner.next()) != null) {
315        res.advance();
316        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
317        for (Cell cell : res.listCells()) {
318          if (
319            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
320              BackupSystemTable.PATH_COL.length) == 0
321          ) {
322            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
323          }
324        }
325      }
326      return map;
327    }
328  }
329
330  /**
331   * Deletes backup status from backup system table table
332   * @param backupId backup id
333   * @throws IOException exception
334   */
335  public void deleteBackupInfo(String backupId) throws IOException {
336    if (LOG.isTraceEnabled()) {
337      LOG.trace("delete backup status in backup system table for " + backupId);
338    }
339    try (Table table = connection.getTable(tableName)) {
340      Delete del = createDeleteForBackupInfo(backupId);
341      table.delete(del);
342    }
343  }
344
345  /**
346   * Registers a bulk load.
347   * @param tableName     table name
348   * @param region        the region receiving hfile
349   * @param cfToHfilePath column family and associated hfiles
350   */
351  public void registerBulkLoad(TableName tableName, byte[] region,
352    Map<byte[], List<Path>> cfToHfilePath) throws IOException {
353    if (LOG.isDebugEnabled()) {
354      LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
355        cfToHfilePath.size());
356    }
357    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
358      List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
359      bufferedMutator.mutate(puts);
360      LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName);
361    }
362  }
363
364  /**
365   * Removes entries from the table that tracks all bulk loaded hfiles.
366   * @param rows the row keys of the entries to be deleted
367   */
368  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
369    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
370      List<Delete> deletes = new ArrayList<>();
371      for (byte[] row : rows) {
372        Delete del = new Delete(row);
373        deletes.add(del);
374        LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row));
375      }
376      bufferedMutator.mutate(deletes);
377      LOG.debug("Deleted {} bulk load entries.", rows.size());
378    }
379  }
380
381  /**
382   * Reads all registered bulk loads.
383   */
384  public List<BulkLoad> readBulkloadRows() throws IOException {
385    Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(null);
386    return processBulkLoadRowScan(scan);
387  }
388
389  /**
390   * Reads the registered bulk loads for the given tables.
391   */
392  public List<BulkLoad> readBulkloadRows(Collection<TableName> tableList) throws IOException {
393    List<BulkLoad> result = new ArrayList<>();
394    for (TableName table : tableList) {
395      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
396      result.addAll(processBulkLoadRowScan(scan));
397    }
398    return result;
399  }
400
401  private List<BulkLoad> processBulkLoadRowScan(Scan scan) throws IOException {
402    List<BulkLoad> result = new ArrayList<>();
403    try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
404      ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
405      Result res;
406      while ((res = scanner.next()) != null) {
407        res.advance();
408        TableName table = null;
409        String fam = null;
410        String path = null;
411        String region = null;
412        byte[] row = null;
413        for (Cell cell : res.listCells()) {
414          row = CellUtil.cloneRow(cell);
415          String rowStr = Bytes.toString(row);
416          region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
417          if (
418            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
419              BackupSystemTable.TBL_COL.length) == 0
420          ) {
421            table = TableName.valueOf(CellUtil.cloneValue(cell));
422          } else if (
423            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
424              BackupSystemTable.FAM_COL.length) == 0
425          ) {
426            fam = Bytes.toString(CellUtil.cloneValue(cell));
427          } else if (
428            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
429              BackupSystemTable.PATH_COL.length) == 0
430          ) {
431            path = Bytes.toString(CellUtil.cloneValue(cell));
432          }
433        }
434        result.add(new BulkLoad(table, region, fam, path, row));
435        LOG.debug("Found bulk load entry for table {}, family {}: {}", table, fam, path);
436      }
437    }
438    return result;
439  }
440
441  /**
442   * Reads backup status object (instance of backup info) from backup system table table
443   * @param backupId backup id
444   * @return Current status of backup session or null
445   */
446  public BackupInfo readBackupInfo(String backupId) throws IOException {
447    if (LOG.isTraceEnabled()) {
448      LOG.trace("read backup status from backup system table for: " + backupId);
449    }
450
451    try (Table table = connection.getTable(tableName)) {
452      Get get = createGetForBackupInfo(backupId);
453      Result res = table.get(get);
454      if (res.isEmpty()) {
455        return null;
456      }
457      return resultToBackupInfo(res);
458    }
459  }
460
461  /**
462   * Read the last backup start code (timestamp) of last successful backup. Will return null if
463   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
464   * there is no successful backup completed so far.
465   * @param backupRoot directory path to backup destination
466   * @return the timestamp of last successful backup
467   * @throws IOException exception
468   */
469  public String readBackupStartCode(String backupRoot) throws IOException {
470    LOG.trace("read backup start code from backup system table");
471
472    try (Table table = connection.getTable(tableName)) {
473      Get get = createGetForStartCode(backupRoot);
474      Result res = table.get(get);
475      if (res.isEmpty()) {
476        return null;
477      }
478      Cell cell = res.listCells().get(0);
479      byte[] val = CellUtil.cloneValue(cell);
480      if (val.length == 0) {
481        return null;
482      }
483      return new String(val, StandardCharsets.UTF_8);
484    }
485  }
486
487  /**
488   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
489   * @param startCode  start code
490   * @param backupRoot root directory path to backup
491   * @throws IOException exception
492   */
493  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
494    if (LOG.isTraceEnabled()) {
495      LOG.trace("write backup start code to backup system table " + startCode);
496    }
497    try (Table table = connection.getTable(tableName)) {
498      Put put = createPutForStartCode(startCode.toString(), backupRoot);
499      table.put(put);
500    }
501  }
502
503  /**
504   * Exclusive operations are: create, delete, merge
505   * @throws IOException if a table operation fails or an active backup exclusive operation is
506   *                     already underway
507   */
508  public void startBackupExclusiveOperation() throws IOException {
509    LOG.debug("Start new backup exclusive operation");
510
511    try (Table table = connection.getTable(tableName)) {
512      Put put = createPutForStartBackupSession();
513      // First try to put if row does not exist
514      if (
515        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
516          .ifNotExists().thenPut(put)
517      ) {
518        // Row exists, try to put if value == ACTIVE_SESSION_NO
519        if (
520          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
521            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
522        ) {
523          throw new ExclusiveOperationException();
524        }
525      }
526    }
527  }
528
529  private Put createPutForStartBackupSession() {
530    Put put = new Put(ACTIVE_SESSION_ROW);
531    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
532    return put;
533  }
534
535  public void finishBackupExclusiveOperation() throws IOException {
536    LOG.debug("Finish backup exclusive operation");
537
538    try (Table table = connection.getTable(tableName)) {
539      Put put = createPutForStopBackupSession();
540      if (
541        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
542          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
543      ) {
544        throw new IOException("There is no active backup exclusive operation");
545      }
546    }
547  }
548
549  private Put createPutForStopBackupSession() {
550    Put put = new Put(ACTIVE_SESSION_ROW);
551    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
552    return put;
553  }
554
555  /**
556   * Get the Region Servers log information after the last log roll from backup system table.
557   * @param backupRoot root directory path to backup
558   * @return RS log info
559   * @throws IOException exception
560   */
561  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
562    throws IOException {
563    LOG.trace("read region server last roll log result to backup system table");
564
565    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
566
567    try (Table table = connection.getTable(tableName);
568      ResultScanner scanner = table.getScanner(scan)) {
569      Result res;
570      HashMap<String, Long> rsTimestampMap = new HashMap<>();
571      while ((res = scanner.next()) != null) {
572        res.advance();
573        Cell cell = res.current();
574        byte[] row = CellUtil.cloneRow(cell);
575        String server = getServerNameForReadRegionServerLastLogRollResult(row);
576        byte[] data = CellUtil.cloneValue(cell);
577        rsTimestampMap.put(server, Bytes.toLong(data));
578      }
579      return rsTimestampMap;
580    }
581  }
582
583  /**
584   * Writes Region Server last roll log result (timestamp) to backup system table table
585   * @param server     Region Server name
586   * @param ts         last log timestamp
587   * @param backupRoot root directory path to backup
588   * @throws IOException exception
589   */
590  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
591    throws IOException {
592    LOG.trace("write region server last roll log result to backup system table");
593
594    try (Table table = connection.getTable(tableName)) {
595      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
596      table.put(put);
597    }
598  }
599
600  /**
601   * Get all backup information passing the given filters, ordered by descending start time. I.e.
602   * from newest to oldest.
603   */
604  public List<BackupInfo> getBackupHistory(BackupInfo.Filter... toInclude) throws IOException {
605    LOG.trace("get backup history from backup system table");
606
607    List<BackupInfo> list = getBackupInfos(toInclude);
608    list.sort(Comparator.comparing(BackupInfo::getStartTs).reversed());
609    return list;
610  }
611
612  /**
613   * Retrieve all table names that are part of any known completed backup
614   */
615  public Set<TableName> getTablesIncludedInBackups() throws IOException {
616    // Incremental backups have the same tables as the preceding full backups
617    List<BackupInfo> infos =
618      getBackupInfos(withState(BackupState.COMPLETE), withType(BackupType.FULL));
619    return infos.stream().flatMap(info -> info.getTableNames().stream())
620      .collect(Collectors.toSet());
621  }
622
623  /**
624   * Goes through all backup history corresponding to the provided root folder, and collects all
625   * backup info mentioning each of the provided tables.
626   * @param set        the tables for which to collect the {@code BackupInfo}
627   * @param backupRoot backup destination path to retrieve backup history for
628   * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at
629   *         least one {@code BackupInfo}
630   * @throws IOException if getting the backup history fails
631   */
632  public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
633    String backupRoot) throws IOException {
634    List<BackupInfo> history = getBackupHistory(withRoot(backupRoot));
635    Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>();
636    for (BackupInfo info : history) {
637      List<TableName> tables = info.getTableNames();
638      for (TableName tableName : tables) {
639        if (set.contains(tableName)) {
640          List<BackupInfo> list =
641            tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>());
642          list.add(info);
643        }
644      }
645    }
646    return tableHistoryMap;
647  }
648
649  /**
650   * Get all backup infos passing the given filters (ordered by ascending backup id)
651   */
652  public List<BackupInfo> getBackupInfos(BackupInfo.Filter... toInclude) throws IOException {
653    return getBackupInfos(Integer.MAX_VALUE, toInclude);
654  }
655
656  /**
657   * Get the first n backup infos passing the given filters (ordered by ascending backup id)
658   */
659  public List<BackupInfo> getBackupInfos(int n, BackupInfo.Filter... toInclude) throws IOException {
660    LOG.trace("get backup infos from backup system table");
661
662    if (n <= 0) {
663      return Collections.emptyList();
664    }
665
666    Predicate<BackupInfo> combinedPredicate = Stream.of(toInclude)
667      .map(filter -> (Predicate<BackupInfo>) filter).reduce(Predicate::and).orElse(x -> true);
668
669    Scan scan = createScanForBackupHistory();
670    List<BackupInfo> list = new ArrayList<>();
671
672    try (Table table = connection.getTable(tableName);
673      ResultScanner scanner = table.getScanner(scan)) {
674      Result res;
675      while ((res = scanner.next()) != null) {
676        res.advance();
677        BackupInfo context = cellToBackupInfo(res.current());
678        if (combinedPredicate.test(context)) {
679          list.add(context);
680          if (list.size() == n) {
681            break;
682          }
683        }
684      }
685      return list;
686    }
687  }
688
689  /**
690   * Write the current timestamps for each regionserver to backup system table after a successful
691   * full or incremental backup. The saved timestamp is of the last log file that was backed up
692   * already.
693   * @param tables        tables
694   * @param newTimestamps timestamps
695   * @param backupRoot    root directory path to backup
696   * @throws IOException exception
697   */
698  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
699    String backupRoot) throws IOException {
700    if (LOG.isTraceEnabled()) {
701      LOG.trace("write RS log time stamps to backup system table for tables ["
702        + StringUtils.join(tables, ",") + "]");
703    }
704    List<Put> puts = new ArrayList<>();
705    for (TableName table : tables) {
706      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
707      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
708      puts.add(put);
709    }
710    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) {
711      bufferedMutator.mutate(puts);
712    }
713  }
714
715  /**
716   * Read the timestamp for each region server log after the last successful backup. Each table has
717   * its own set of the timestamps. The info is stored for each table as a concatenated string of
718   * rs->timestapmp
719   * @param backupRoot root directory path to backup
720   * @return the timestamp for each region server. key: tableName value:
721   *         RegionServer,PreviousTimeStamp
722   * @throws IOException exception
723   */
724  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
725    throws IOException {
726    if (LOG.isTraceEnabled()) {
727      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
728    }
729
730    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
731
732    Scan scan = createScanForReadLogTimestampMap(backupRoot);
733    try (Table table = connection.getTable(tableName);
734      ResultScanner scanner = table.getScanner(scan)) {
735      Result res;
736      while ((res = scanner.next()) != null) {
737        res.advance();
738        Cell cell = res.current();
739        byte[] row = CellUtil.cloneRow(cell);
740        String tabName = getTableNameForReadLogTimestampMap(row);
741        TableName tn = TableName.valueOf(tabName);
742        byte[] data = CellUtil.cloneValue(cell);
743        if (data == null) {
744          throw new IOException("Data of last backup data from backup system table "
745            + "is empty. Create a backup first.");
746        }
747        if (data != null && data.length > 0) {
748          HashMap<String, Long> lastBackup =
749            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
750          tableTimestampMap.put(tn, lastBackup);
751        }
752      }
753      return tableTimestampMap;
754    }
755  }
756
757  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
758    Map<String, Long> map) {
759    BackupProtos.TableServerTimestamp.Builder tstBuilder =
760      BackupProtos.TableServerTimestamp.newBuilder();
761    tstBuilder
762      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
763
764    for (Entry<String, Long> entry : map.entrySet()) {
765      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
766      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
767      ServerName sn = ServerName.parseServerName(entry.getKey());
768      snBuilder.setHostName(sn.getHostname());
769      snBuilder.setPort(sn.getPort());
770      builder.setServerName(snBuilder.build());
771      builder.setTimestamp(entry.getValue());
772      tstBuilder.addServerTimestamp(builder.build());
773    }
774
775    return tstBuilder.build();
776  }
777
778  private HashMap<String, Long>
779    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
780
781    HashMap<String, Long> map = new HashMap<>();
782    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
783    for (BackupProtos.ServerTimestamp st : list) {
784      ServerName sn =
785        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
786      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
787    }
788    return map;
789  }
790
791  /**
792   * Return the current tables covered by incremental backup.
793   * @param backupRoot root directory path to backup
794   * @return set of tableNames
795   * @throws IOException exception
796   */
797  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
798    LOG.trace("get incremental backup table set from backup system table");
799
800    TreeSet<TableName> set = new TreeSet<>();
801
802    try (Table table = connection.getTable(tableName)) {
803      Get get = createGetForIncrBackupTableSet(backupRoot);
804      Result res = table.get(get);
805      if (res.isEmpty()) {
806        return set;
807      }
808      List<Cell> cells = res.listCells();
809      for (Cell cell : cells) {
810        // qualifier = table name - we use table names as qualifiers
811        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
812      }
813      return set;
814    }
815  }
816
817  /**
818   * Add tables to global incremental backup set
819   * @param tables     set of tables
820   * @param backupRoot root directory path to backup
821   * @throws IOException exception
822   */
823  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
824    throws IOException {
825    if (LOG.isTraceEnabled()) {
826      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
827        + " tables [" + StringUtils.join(tables, " ") + "]");
828    }
829    if (LOG.isDebugEnabled()) {
830      tables.forEach(table -> LOG.debug(Objects.toString(table)));
831    }
832    try (Table table = connection.getTable(tableName)) {
833      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
834      table.put(put);
835    }
836  }
837
838  /**
839   * Deletes incremental backup set for a backup destination
840   * @param backupRoot backup root
841   */
842  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
843    if (LOG.isTraceEnabled()) {
844      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
845    }
846    try (Table table = connection.getTable(tableName)) {
847      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
848      table.delete(delete);
849    }
850  }
851
852  /**
853   * Checks if we have at least one backup session in backup system table This API is used by
854   * BackupLogCleaner
855   * @return true, if - at least one session exists in backup system table table
856   * @throws IOException exception
857   */
858  public boolean hasBackupSessions() throws IOException {
859    LOG.trace("Has backup sessions from backup system table");
860
861    boolean result = false;
862    Scan scan = createScanForBackupHistory();
863    scan.setCaching(1);
864    try (Table table = connection.getTable(tableName);
865      ResultScanner scanner = table.getScanner(scan)) {
866      if (scanner.next() != null) {
867        result = true;
868      }
869      return result;
870    }
871  }
872
873  /**
874   * BACKUP SETS
875   */
876
877  /**
878   * Get backup set list
879   * @return backup set list
880   * @throws IOException if a table or scanner operation fails
881   */
882  public List<String> listBackupSets() throws IOException {
883    LOG.trace("Backup set list");
884
885    List<String> list = new ArrayList<>();
886    try (Table table = connection.getTable(tableName)) {
887      Scan scan = createScanForBackupSetList();
888      scan.readVersions(1);
889      try (ResultScanner scanner = table.getScanner(scan)) {
890        Result res;
891        while ((res = scanner.next()) != null) {
892          res.advance();
893          list.add(cellKeyToBackupSetName(res.current()));
894        }
895        return list;
896      }
897    }
898  }
899
900  /**
901   * Get backup set description (list of tables)
902   * @param name set's name
903   * @return list of tables in a backup set
904   * @throws IOException if a table operation fails
905   */
906  public List<TableName> describeBackupSet(String name) throws IOException {
907    if (LOG.isTraceEnabled()) {
908      LOG.trace(" Backup set describe: " + name);
909    }
910    try (Table table = connection.getTable(tableName)) {
911      Get get = createGetForBackupSet(name);
912      Result res = table.get(get);
913      if (res.isEmpty()) {
914        return null;
915      }
916      res.advance();
917      String[] tables = cellValueToBackupSet(res.current());
918      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
919        .collect(Collectors.toList());
920    }
921  }
922
923  /**
924   * Add backup set (list of tables)
925   * @param name      set name
926   * @param newTables list of tables, comma-separated
927   * @throws IOException if a table operation fails
928   */
929  public void addToBackupSet(String name, String[] newTables) throws IOException {
930    if (LOG.isTraceEnabled()) {
931      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
932    }
933    String[] union = null;
934    try (Table table = connection.getTable(tableName)) {
935      Get get = createGetForBackupSet(name);
936      Result res = table.get(get);
937      if (res.isEmpty()) {
938        union = newTables;
939      } else {
940        res.advance();
941        String[] tables = cellValueToBackupSet(res.current());
942        union = merge(tables, newTables);
943      }
944      Put put = createPutForBackupSet(name, union);
945      table.put(put);
946    }
947  }
948
949  /**
950   * Remove tables from backup set (list of tables)
951   * @param name     set name
952   * @param toRemove list of tables
953   * @throws IOException if a table operation or deleting the backup set fails
954   */
955  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
956    if (LOG.isTraceEnabled()) {
957      LOG.trace(
958        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
959    }
960    String[] disjoint;
961    String[] tables;
962    try (Table table = connection.getTable(tableName)) {
963      Get get = createGetForBackupSet(name);
964      Result res = table.get(get);
965      if (res.isEmpty()) {
966        LOG.warn("Backup set '" + name + "' not found.");
967        return;
968      } else {
969        res.advance();
970        tables = cellValueToBackupSet(res.current());
971        disjoint = disjoin(tables, toRemove);
972      }
973      if (disjoint.length > 0 && disjoint.length != tables.length) {
974        Put put = createPutForBackupSet(name, disjoint);
975        table.put(put);
976      } else if (disjoint.length == tables.length) {
977        LOG.warn("Backup set '" + name + "' does not contain tables ["
978          + StringUtils.join(toRemove, " ") + "]");
979      } else { // disjoint.length == 0 and tables.length >0
980        // Delete backup set
981        LOG.info("Backup set '" + name + "' is empty. Deleting.");
982        deleteBackupSet(name);
983      }
984    }
985  }
986
987  private String[] merge(String[] existingTables, String[] newTables) {
988    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
989    tables.addAll(Arrays.asList(newTables));
990    return tables.toArray(new String[0]);
991  }
992
993  private String[] disjoin(String[] existingTables, String[] toRemove) {
994    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
995    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
996    return tables.toArray(new String[0]);
997  }
998
999  /**
1000   * Delete backup set
1001   * @param name set's name
1002   * @throws IOException if getting or deleting the table fails
1003   */
1004  public void deleteBackupSet(String name) throws IOException {
1005    if (LOG.isTraceEnabled()) {
1006      LOG.trace(" Backup set delete: " + name);
1007    }
1008    try (Table table = connection.getTable(tableName)) {
1009      Delete del = createDeleteForBackupSet(name);
1010      table.delete(del);
1011    }
1012  }
1013
1014  /**
1015   * Get backup system table descriptor
1016   * @return table's descriptor
1017   */
1018  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1019    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1020
1021    ColumnFamilyDescriptorBuilder colBuilder =
1022      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1023
1024    colBuilder.setMaxVersions(1);
1025    Configuration config = HBaseConfiguration.create();
1026    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1027      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1028    colBuilder.setTimeToLive(ttl);
1029
1030    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1031    builder.setColumnFamily(colSessionsDesc);
1032
1033    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1034    colBuilder.setTimeToLive(ttl);
1035    builder.setColumnFamily(colBuilder.build());
1036    return builder.build();
1037  }
1038
1039  public static TableName getTableName(Configuration conf) {
1040    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1041      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1042    return TableName.valueOf(name);
1043  }
1044
1045  public static String getTableNameAsString(Configuration conf) {
1046    return getTableName(conf).getNameAsString();
1047  }
1048
1049  public static String getSnapshotName(Configuration conf) {
1050    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1051  }
1052
1053  /**
1054   * Get backup system table descriptor
1055   * @return table's descriptor
1056   */
1057  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1058    TableDescriptorBuilder builder =
1059      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1060
1061    ColumnFamilyDescriptorBuilder colBuilder =
1062      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1063    colBuilder.setMaxVersions(1);
1064    Configuration config = HBaseConfiguration.create();
1065    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1066      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1067    colBuilder.setTimeToLive(ttl);
1068    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1069    builder.setColumnFamily(colSessionsDesc);
1070    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1071    colBuilder.setTimeToLive(ttl);
1072    builder.setColumnFamily(colBuilder.build());
1073    return builder.build();
1074  }
1075
1076  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1077    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1078      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1079    return TableName.valueOf(name);
1080  }
1081
1082  /**
1083   * Creates Put operation for a given backup info object
1084   * @param context backup info
1085   * @return put operation
1086   * @throws IOException exception
1087   */
1088  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1089    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1090    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1091      context.toByteArray());
1092    return put;
1093  }
1094
1095  /**
1096   * Creates Get operation for a given backup id
1097   * @param backupId backup's ID
1098   * @return get operation
1099   * @throws IOException exception
1100   */
1101  private Get createGetForBackupInfo(String backupId) throws IOException {
1102    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1103    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1104    get.readVersions(1);
1105    return get;
1106  }
1107
1108  /**
1109   * Creates Delete operation for a given backup id
1110   * @param backupId backup's ID
1111   * @return delete operation
1112   */
1113  private Delete createDeleteForBackupInfo(String backupId) {
1114    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1115    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1116    return del;
1117  }
1118
1119  /**
1120   * Converts Result to BackupInfo
1121   * @param res HBase result
1122   * @return backup info instance
1123   * @throws IOException exception
1124   */
1125  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1126    res.advance();
1127    Cell cell = res.current();
1128    return cellToBackupInfo(cell);
1129  }
1130
1131  /**
1132   * Creates Get operation to retrieve start code from backup system table
1133   * @return get operation
1134   * @throws IOException exception
1135   */
1136  private Get createGetForStartCode(String rootPath) throws IOException {
1137    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1138    get.addFamily(BackupSystemTable.META_FAMILY);
1139    get.readVersions(1);
1140    return get;
1141  }
1142
1143  /**
1144   * Creates Put operation to store start code to backup system table
1145   * @return put operation
1146   */
1147  private Put createPutForStartCode(String startCode, String rootPath) {
1148    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1149    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1150      Bytes.toBytes(startCode));
1151    return put;
1152  }
1153
1154  /**
1155   * Creates Get to retrieve incremental backup table set from backup system table
1156   * @return get operation
1157   * @throws IOException exception
1158   */
1159  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1160    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1161    get.addFamily(BackupSystemTable.META_FAMILY);
1162    get.readVersions(1);
1163    return get;
1164  }
1165
1166  /**
1167   * Creates Put to store incremental backup table set
1168   * @param tables tables
1169   * @return put operation
1170   */
1171  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1172    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1173    for (TableName table : tables) {
1174      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1175        EMPTY_VALUE);
1176    }
1177    return put;
1178  }
1179
1180  /**
1181   * Creates Delete for incremental backup table set
1182   * @param backupRoot backup root
1183   * @return delete operation
1184   */
1185  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1186    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1187    delete.addFamily(BackupSystemTable.META_FAMILY);
1188    return delete;
1189  }
1190
1191  /**
1192   * Creates Scan operation to load backup history
1193   * @return scan operation
1194   */
1195  private Scan createScanForBackupHistory() {
1196    Scan scan = new Scan();
1197    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1198    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1199    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1200    scan.withStartRow(startRow);
1201    scan.withStopRow(stopRow);
1202    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1203    scan.readVersions(1);
1204    return scan;
1205  }
1206
1207  /**
1208   * Converts cell to backup info instance.
1209   * @param current current cell
1210   * @return backup backup info instance
1211   * @throws IOException exception
1212   */
1213  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1214    byte[] data = CellUtil.cloneValue(current);
1215    return BackupInfo.fromByteArray(data);
1216  }
1217
1218  /**
1219   * Creates Put to write RS last roll log timestamp map
1220   * @param table table
1221   * @param smap  map, containing RS:ts
1222   * @return put operation
1223   */
1224  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1225    String backupRoot) {
1226    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1227    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1228    return put;
1229  }
1230
1231  /**
1232   * Creates Scan to load table-> { RS -> ts} map of maps
1233   * @return scan operation
1234   */
1235  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1236    Scan scan = new Scan();
1237    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1238    scan.addFamily(BackupSystemTable.META_FAMILY);
1239
1240    return scan;
1241  }
1242
1243  /**
1244   * Get table name from rowkey
1245   * @param cloneRow rowkey
1246   * @return table name
1247   */
1248  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1249    String s = Bytes.toString(cloneRow);
1250    int index = s.lastIndexOf(NULL);
1251    return s.substring(index + 1);
1252  }
1253
1254  /**
1255   * Creates Put to store RS last log result
1256   * @param server    server name
1257   * @param timestamp log roll result (timestamp)
1258   * @return put operation
1259   */
1260  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1261    String backupRoot) {
1262    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1263    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1264      Bytes.toBytes(timestamp));
1265    return put;
1266  }
1267
1268  /**
1269   * Creates Scan operation to load last RS log roll results
1270   * @return scan operation
1271   */
1272  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1273    Scan scan = new Scan();
1274    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1275    scan.addFamily(BackupSystemTable.META_FAMILY);
1276    scan.readVersions(1);
1277
1278    return scan;
1279  }
1280
1281  /**
1282   * Get server's name from rowkey
1283   * @param row rowkey
1284   * @return server's name
1285   */
1286  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1287    String s = Bytes.toString(row);
1288    int index = s.lastIndexOf(NULL);
1289    return s.substring(index + 1);
1290  }
1291
1292  /**
1293   * Creates Put's for bulk loads.
1294   */
1295  private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
1296    Map<byte[], List<Path>> columnFamilyToHFilePaths) {
1297    List<Put> puts = new ArrayList<>();
1298    for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
1299      for (Path path : entry.getValue()) {
1300        String file = path.toString();
1301        int lastSlash = file.lastIndexOf("/");
1302        String filename = file.substring(lastSlash + 1);
1303        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1304          Bytes.toString(region), BLK_LD_DELIM, filename));
1305        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1306        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1307        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1308        puts.add(put);
1309        LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
1310      }
1311    }
1312    return puts;
1313  }
1314
1315  public static void snapshot(Connection conn) throws IOException {
1316    try (Admin admin = conn.getAdmin()) {
1317      Configuration conf = conn.getConfiguration();
1318      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1319    }
1320  }
1321
1322  public static void restoreFromSnapshot(Connection conn) throws IOException {
1323    Configuration conf = conn.getConfiguration();
1324    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1325    try (Admin admin = conn.getAdmin()) {
1326      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1327      if (snapshotExists(admin, snapshotName)) {
1328        admin.restoreBackupSystemTable(snapshotName);
1329        LOG.debug("Done restoring backup system table");
1330      } else {
1331        // Snapshot does not exists, i.e completeBackup failed after
1332        // deleting backup system table snapshot
1333        // In this case we log WARN and proceed
1334        LOG.warn(
1335          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1336      }
1337    }
1338  }
1339
1340  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1341    List<SnapshotDescription> list = admin.listSnapshots();
1342    for (SnapshotDescription desc : list) {
1343      if (desc.getName().equals(snapshotName)) {
1344        return true;
1345      }
1346    }
1347    return false;
1348  }
1349
1350  public static boolean snapshotExists(Connection conn) throws IOException {
1351    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1352  }
1353
1354  public static void deleteSnapshot(Connection conn) throws IOException {
1355    Configuration conf = conn.getConfiguration();
1356    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1357    try (Admin admin = conn.getAdmin()) {
1358      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1359      if (snapshotExists(admin, snapshotName)) {
1360        admin.deleteSnapshot(snapshotName);
1361        LOG.debug("Done deleting backup system table snapshot");
1362      } else {
1363        LOG.error("Snapshot " + snapshotName + " does not exists");
1364      }
1365    }
1366  }
1367
1368  private Put createPutForDeleteOperation(String[] backupIdList) {
1369    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1370    Put put = new Put(DELETE_OP_ROW);
1371    put.addColumn(META_FAMILY, FAM_COL, value);
1372    return put;
1373  }
1374
1375  private Delete createDeleteForBackupDeleteOperation() {
1376    Delete delete = new Delete(DELETE_OP_ROW);
1377    delete.addFamily(META_FAMILY);
1378    return delete;
1379  }
1380
1381  private Get createGetForDeleteOperation() {
1382    Get get = new Get(DELETE_OP_ROW);
1383    get.addFamily(META_FAMILY);
1384    return get;
1385  }
1386
1387  public void startDeleteOperation(String[] backupIdList) throws IOException {
1388    if (LOG.isTraceEnabled()) {
1389      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1390    }
1391    Put put = createPutForDeleteOperation(backupIdList);
1392    try (Table table = connection.getTable(tableName)) {
1393      table.put(put);
1394    }
1395  }
1396
1397  public void finishDeleteOperation() throws IOException {
1398    LOG.trace("Finsih delete operation for backup ids");
1399
1400    Delete delete = createDeleteForBackupDeleteOperation();
1401    try (Table table = connection.getTable(tableName)) {
1402      table.delete(delete);
1403    }
1404  }
1405
1406  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1407    LOG.trace("Get delete operation for backup ids");
1408
1409    Get get = createGetForDeleteOperation();
1410    try (Table table = connection.getTable(tableName)) {
1411      Result res = table.get(get);
1412      if (res.isEmpty()) {
1413        return null;
1414      }
1415      Cell cell = res.listCells().get(0);
1416      byte[] val = CellUtil.cloneValue(cell);
1417      if (val.length == 0) {
1418        return null;
1419      }
1420      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1421        .toArray(String[]::new);
1422    }
1423  }
1424
1425  private Put createPutForMergeOperation(String[] backupIdList) {
1426    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1427    Put put = new Put(MERGE_OP_ROW);
1428    put.addColumn(META_FAMILY, FAM_COL, value);
1429    return put;
1430  }
1431
1432  public boolean isMergeInProgress() throws IOException {
1433    Get get = new Get(MERGE_OP_ROW);
1434    try (Table table = connection.getTable(tableName)) {
1435      Result res = table.get(get);
1436      return !res.isEmpty();
1437    }
1438  }
1439
1440  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1441    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1442    Put put = new Put(MERGE_OP_ROW);
1443    put.addColumn(META_FAMILY, PATH_COL, value);
1444    return put;
1445  }
1446
1447  private Delete createDeleteForBackupMergeOperation() {
1448    Delete delete = new Delete(MERGE_OP_ROW);
1449    delete.addFamily(META_FAMILY);
1450    return delete;
1451  }
1452
1453  private Get createGetForMergeOperation() {
1454    Get get = new Get(MERGE_OP_ROW);
1455    get.addFamily(META_FAMILY);
1456    return get;
1457  }
1458
1459  public void startMergeOperation(String[] backupIdList) throws IOException {
1460    if (LOG.isTraceEnabled()) {
1461      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1462    }
1463    Put put = createPutForMergeOperation(backupIdList);
1464    try (Table table = connection.getTable(tableName)) {
1465      table.put(put);
1466    }
1467  }
1468
1469  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1470    if (LOG.isTraceEnabled()) {
1471      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1472    }
1473    Put put = createPutForUpdateTablesForMerge(tables);
1474    try (Table table = connection.getTable(tableName)) {
1475      table.put(put);
1476    }
1477  }
1478
1479  public void finishMergeOperation() throws IOException {
1480    LOG.trace("Finish merge operation for backup ids");
1481
1482    Delete delete = createDeleteForBackupMergeOperation();
1483    try (Table table = connection.getTable(tableName)) {
1484      table.delete(delete);
1485    }
1486  }
1487
1488  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1489    LOG.trace("Get backup ids for merge operation");
1490
1491    Get get = createGetForMergeOperation();
1492    try (Table table = connection.getTable(tableName)) {
1493      Result res = table.get(get);
1494      if (res.isEmpty()) {
1495        return null;
1496      }
1497      Cell cell = res.listCells().get(0);
1498      byte[] val = CellUtil.cloneValue(cell);
1499      if (val.length == 0) {
1500        return null;
1501      }
1502      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1503        .toArray(String[]::new);
1504    }
1505  }
1506
1507  /**
1508   * Creates a scan to read all registered bulk loads for the given table, or for all tables if
1509   * {@code table} is {@code null}.
1510   */
1511  static Scan createScanForOrigBulkLoadedFiles(@Nullable TableName table) {
1512    Scan scan = new Scan();
1513    byte[] startRow = table == null
1514      ? BULK_LOAD_PREFIX_BYTES
1515      : rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1516    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1517    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1518    scan.withStartRow(startRow);
1519    scan.withStopRow(stopRow);
1520    scan.addFamily(BackupSystemTable.META_FAMILY);
1521    scan.readVersions(1);
1522    return scan;
1523  }
1524
1525  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1526    // format is bulk : namespace : table : region : file
1527    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1528  }
1529
1530  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1531    // format is bulk : namespace : table : region : file
1532    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1533    Iterator<String> i = parts.iterator();
1534    int idx = 3;
1535    if (parts.size() == 4) {
1536      // the table is in default namespace
1537      idx = 2;
1538    }
1539    String region = Iterators.get(i, idx);
1540    LOG.debug("bulk row string " + rowStr + " region " + region);
1541    return region;
1542  }
1543
1544  /*
1545   * Used to query bulk loaded hfiles which have been copied by incremental backup
1546   * @param backupId the backup Id. It can be null when querying for all tables
1547   * @return the Scan object
1548   * @deprecated This method is broken if a backupId is specified - see HBASE-28715
1549   */
1550  static Scan createScanForBulkLoadedFiles(String backupId) {
1551    Scan scan = new Scan();
1552    byte[] startRow =
1553      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1554    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1555    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1556    scan.withStartRow(startRow);
1557    scan.withStopRow(stopRow);
1558    scan.addFamily(BackupSystemTable.META_FAMILY);
1559    scan.readVersions(1);
1560    return scan;
1561  }
1562
1563  /**
1564   * Creates Scan operation to load backup set list
1565   * @return scan operation
1566   */
1567  private Scan createScanForBackupSetList() {
1568    Scan scan = new Scan();
1569    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1570    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1571    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1572    scan.withStartRow(startRow);
1573    scan.withStopRow(stopRow);
1574    scan.addFamily(BackupSystemTable.META_FAMILY);
1575    return scan;
1576  }
1577
1578  /**
1579   * Creates Get operation to load backup set content
1580   * @return get operation
1581   */
1582  private Get createGetForBackupSet(String name) {
1583    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1584    get.addFamily(BackupSystemTable.META_FAMILY);
1585    return get;
1586  }
1587
1588  /**
1589   * Creates Delete operation to delete backup set content
1590   * @param name backup set's name
1591   * @return delete operation
1592   */
1593  private Delete createDeleteForBackupSet(String name) {
1594    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1595    del.addFamily(BackupSystemTable.META_FAMILY);
1596    return del;
1597  }
1598
1599  /**
1600   * Creates Put operation to update backup set content
1601   * @param name   backup set's name
1602   * @param tables list of tables
1603   * @return put operation
1604   */
1605  private Put createPutForBackupSet(String name, String[] tables) {
1606    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1607    byte[] value = convertToByteArray(tables);
1608    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1609    return put;
1610  }
1611
1612  private byte[] convertToByteArray(String[] tables) {
1613    return Bytes.toBytes(StringUtils.join(tables, ","));
1614  }
1615
1616  /**
1617   * Converts cell to backup set list.
1618   * @param current current cell
1619   * @return backup set as array of table names
1620   */
1621  private String[] cellValueToBackupSet(Cell current) {
1622    byte[] data = CellUtil.cloneValue(current);
1623    if (!ArrayUtils.isEmpty(data)) {
1624      return Bytes.toString(data).split(",");
1625    }
1626    return new String[0];
1627  }
1628
1629  /**
1630   * Converts cell key to backup set name.
1631   * @param current current cell
1632   * @return backup set name
1633   */
1634  private String cellKeyToBackupSetName(Cell current) {
1635    byte[] data = CellUtil.cloneRow(current);
1636    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1637  }
1638
1639  private static byte[] rowkey(String s, String... other) {
1640    StringBuilder sb = new StringBuilder(s);
1641    for (String ss : other) {
1642      sb.append(ss);
1643    }
1644    return Bytes.toBytes(sb.toString());
1645  }
1646
1647  private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException {
1648    if (!admin.isTableEnabled(tableName)) {
1649      try {
1650        admin.enableTable(tableName);
1651      } catch (TableNotDisabledException ignored) {
1652        LOG.info("Table {} is not disabled, ignoring enable request", tableName);
1653      }
1654    }
1655  }
1656}