001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.nio.charset.StandardCharsets;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.Objects;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.ArrayUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.NamespaceDescriptor;
046import org.apache.hadoop.hbase.NamespaceExistException;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableExistsException;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.backup.BackupInfo;
051import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
052import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
053import org.apache.hadoop.hbase.backup.BackupType;
054import org.apache.hadoop.hbase.backup.util.BackupUtils;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
057import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
058import org.apache.hadoop.hbase.client.Connection;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Get;
061import org.apache.hadoop.hbase.client.Put;
062import org.apache.hadoop.hbase.client.Result;
063import org.apache.hadoop.hbase.client.ResultScanner;
064import org.apache.hadoop.hbase.client.Scan;
065import org.apache.hadoop.hbase.client.SnapshotDescription;
066import org.apache.hadoop.hbase.client.Table;
067import org.apache.hadoop.hbase.client.TableDescriptor;
068import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.Pair;
072import org.apache.yetus.audience.InterfaceAudience;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
077import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
078
079import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
081
082/**
083 * This class provides API to access backup system table<br>
084 * Backup system table schema:<br>
085 * <p>
086 * <ul>
087 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
088 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
089 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li>
090 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
091 * timestamp]</li>
092 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
093 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
094 * name</li>
095 * </ul>
096 * </p>
097 */
098@InterfaceAudience.Private
099public final class BackupSystemTable implements Closeable {
100
101  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
102
103  static class WALItem {
104    String backupId;
105    String walFile;
106    String backupRoot;
107
108    WALItem(String backupId, String walFile, String backupRoot) {
109      this.backupId = backupId;
110      this.walFile = walFile;
111      this.backupRoot = backupRoot;
112    }
113
114    public String getBackupId() {
115      return backupId;
116    }
117
118    public String getWalFile() {
119      return walFile;
120    }
121
122    public String getBackupRoot() {
123      return backupRoot;
124    }
125
126    @Override
127    public String toString() {
128      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
129    }
130  }
131
132  /**
133   * Backup system table (main) name
134   */
135  private TableName tableName;
136
137  /**
138   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
139   * separate table because we have to isolate general backup operations: create, merge etc from
140   * activity of RegionObserver, which controls process of a bulk loading
141   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
142   */
143  private TableName bulkLoadTableName;
144
145  /**
146   * Stores backup sessions (contexts)
147   */
148  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
149  /**
150   * Stores other meta
151   */
152  final static byte[] META_FAMILY = Bytes.toBytes("meta");
153  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
154  /**
155   * Connection to HBase cluster, shared among all instances
156   */
157  private final Connection connection;
158
159  private final static String BACKUP_INFO_PREFIX = "session:";
160  private final static String START_CODE_ROW = "startcode:";
161  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
162  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
163
164  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
165  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
166
167  private final static String INCR_BACKUP_SET = "incrbackupset:";
168  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
169  private final static String RS_LOG_TS_PREFIX = "rslogts:";
170
171  private final static String BULK_LOAD_PREFIX = "bulk:";
172  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
173  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
174  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
175
176  final static byte[] TBL_COL = Bytes.toBytes("tbl");
177  final static byte[] FAM_COL = Bytes.toBytes("fam");
178  final static byte[] PATH_COL = Bytes.toBytes("path");
179  final static byte[] STATE_COL = Bytes.toBytes("state");
180  // the two states a bulk loaded file can be
181  final static byte[] BL_PREPARE = Bytes.toBytes("R");
182  final static byte[] BL_COMMIT = Bytes.toBytes("D");
183
184  private final static String SET_KEY_PREFIX = "backupset:";
185
186  // separator between BULK_LOAD_PREFIX and ordinals
187  private final static String BLK_LD_DELIM = ":";
188  private final static byte[] EMPTY_VALUE = new byte[] {};
189
190  // Safe delimiter in a string
191  private final static String NULL = "\u0000";
192
193  public BackupSystemTable(Connection conn) throws IOException {
194    this.connection = conn;
195    Configuration conf = this.connection.getConfiguration();
196    tableName = BackupSystemTable.getTableName(conf);
197    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
198    checkSystemTable();
199  }
200
201  private void checkSystemTable() throws IOException {
202    try (Admin admin = connection.getAdmin()) {
203      verifyNamespaceExists(admin);
204      Configuration conf = connection.getConfiguration();
205      if (!admin.tableExists(tableName)) {
206        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
207        createSystemTable(admin, backupHTD);
208      }
209      if (!admin.tableExists(bulkLoadTableName)) {
210        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
211        createSystemTable(admin, blHTD);
212      }
213      waitForSystemTable(admin, tableName);
214      waitForSystemTable(admin, bulkLoadTableName);
215    }
216  }
217
218  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
219    try {
220      admin.createTable(descriptor);
221    } catch (TableExistsException e) {
222      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
223      // so may be subject to race conditions where one caller succeeds in creating the
224      // table and others fail because it now exists
225      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
226    }
227  }
228
229  private void verifyNamespaceExists(Admin admin) throws IOException {
230    String namespaceName = tableName.getNamespaceAsString();
231    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
232    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
233    boolean exists = false;
234    for (NamespaceDescriptor nsd : list) {
235      if (nsd.getName().equals(ns.getName())) {
236        exists = true;
237        break;
238      }
239    }
240    if (!exists) {
241      try {
242        admin.createNamespace(ns);
243      } catch (NamespaceExistException e) {
244        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
245        // so may be subject to race conditions where one caller succeeds in creating the
246        // namespace and others fail because it now exists
247        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
248      }
249    }
250  }
251
252  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
253    // Return fast if the table is available and avoid a log message
254    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
255      return;
256    }
257    long TIMEOUT = 60000;
258    long startTime = EnvironmentEdgeManager.currentTime();
259    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
260      tableName);
261    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
262      try {
263        Thread.sleep(100);
264      } catch (InterruptedException e) {
265        throw (IOException) new InterruptedIOException().initCause(e);
266      }
267      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
268        throw new IOException(
269          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
270      }
271    }
272    LOG.debug("Backup table {} exists and available", tableName);
273  }
274
275  @Override
276  public void close() {
277    // do nothing
278  }
279
280  /**
281   * Updates status (state) of a backup session in backup system table table
282   * @param info backup info
283   * @throws IOException exception
284   */
285  public void updateBackupInfo(BackupInfo info) throws IOException {
286    if (LOG.isTraceEnabled()) {
287      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
288        + " set status=" + info.getState());
289    }
290    try (Table table = connection.getTable(tableName)) {
291      Put put = createPutForBackupInfo(info);
292      table.put(put);
293    }
294  }
295
296  /*
297   * @param backupId the backup Id
298   * @return Map of rows to path of bulk loaded hfile
299   */
300  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
301    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
302    try (Table table = connection.getTable(bulkLoadTableName);
303      ResultScanner scanner = table.getScanner(scan)) {
304      Result res = null;
305      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
306      while ((res = scanner.next()) != null) {
307        res.advance();
308        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
309        for (Cell cell : res.listCells()) {
310          if (
311            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
312              BackupSystemTable.PATH_COL.length) == 0
313          ) {
314            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
315          }
316        }
317      }
318      return map;
319    }
320  }
321
322  /*
323   * Used during restore
324   * @param backupId the backup Id
325   * @param sTableList List of tables
326   * @return array of Map of family to List of Paths
327   */
328  public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
329    throws IOException {
330    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
331    @SuppressWarnings("unchecked")
332    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
333    try (Table table = connection.getTable(bulkLoadTableName);
334      ResultScanner scanner = table.getScanner(scan)) {
335      Result res = null;
336      while ((res = scanner.next()) != null) {
337        res.advance();
338        TableName tbl = null;
339        byte[] fam = null;
340        String path = null;
341        for (Cell cell : res.listCells()) {
342          if (
343            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
344              BackupSystemTable.TBL_COL.length) == 0
345          ) {
346            tbl = TableName.valueOf(CellUtil.cloneValue(cell));
347          } else if (
348            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
349              BackupSystemTable.FAM_COL.length) == 0
350          ) {
351            fam = CellUtil.cloneValue(cell);
352          } else if (
353            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
354              BackupSystemTable.PATH_COL.length) == 0
355          ) {
356            path = Bytes.toString(CellUtil.cloneValue(cell));
357          }
358        }
359        int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
360        if (srcIdx == -1) {
361          // the table is not among the query
362          continue;
363        }
364        if (mapForSrc[srcIdx] == null) {
365          mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
366        }
367        List<Path> files;
368        if (!mapForSrc[srcIdx].containsKey(fam)) {
369          files = new ArrayList<Path>();
370          mapForSrc[srcIdx].put(fam, files);
371        } else {
372          files = mapForSrc[srcIdx].get(fam);
373        }
374        files.add(new Path(path));
375        if (LOG.isDebugEnabled()) {
376          LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
377        }
378      }
379
380      return mapForSrc;
381    }
382  }
383
384  /**
385   * Deletes backup status from backup system table table
386   * @param backupId backup id
387   * @throws IOException exception
388   */
389  public void deleteBackupInfo(String backupId) throws IOException {
390    if (LOG.isTraceEnabled()) {
391      LOG.trace("delete backup status in backup system table for " + backupId);
392    }
393    try (Table table = connection.getTable(tableName)) {
394      Delete del = createDeleteForBackupInfo(backupId);
395      table.delete(del);
396    }
397  }
398
399  /*
400   * For postBulkLoadHFile() hook.
401   * @param tabName table name
402   * @param region the region receiving hfile
403   * @param finalPaths family and associated hfiles
404   */
405  public void writePathsPostBulkLoad(TableName tabName, byte[] region,
406    Map<byte[], List<Path>> finalPaths) throws IOException {
407    if (LOG.isDebugEnabled()) {
408      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
409        + " entries");
410    }
411    try (Table table = connection.getTable(bulkLoadTableName)) {
412      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
413      table.put(puts);
414      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
415    }
416  }
417
418  /*
419   * For preCommitStoreFile() hook
420   * @param tabName table name
421   * @param region the region receiving hfile
422   * @param family column family
423   * @param pairs list of paths for hfiles
424   */
425  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
426    final List<Pair<Path, Path>> pairs) throws IOException {
427    if (LOG.isDebugEnabled()) {
428      LOG.debug(
429        "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
430    }
431    try (Table table = connection.getTable(bulkLoadTableName)) {
432      List<Put> puts =
433        BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
434      table.put(puts);
435      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
436    }
437  }
438
439  /*
440   * Removes rows recording bulk loaded hfiles from backup table
441   * @param lst list of table names
442   * @param rows the rows to be deleted
443   */
444  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
445    try (Table table = connection.getTable(bulkLoadTableName)) {
446      List<Delete> lstDels = new ArrayList<>();
447      for (byte[] row : rows) {
448        Delete del = new Delete(row);
449        lstDels.add(del);
450        LOG.debug("orig deleting the row: " + Bytes.toString(row));
451      }
452      table.delete(lstDels);
453      LOG.debug("deleted " + rows.size() + " original bulkload rows");
454    }
455  }
456
457  /*
458   * Reads the rows from backup table recording bulk loaded hfiles
459   * @param tableList list of table names
460   * @return The keys of the Map are table, region and column family. Value of the map reflects
461   * whether the hfile was recorded by preCommitStoreFile hook (true)
462   */
463  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
464    readBulkloadRows(List<TableName> tableList) throws IOException {
465
466    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
467    List<byte[]> rows = new ArrayList<>();
468    for (TableName tTable : tableList) {
469      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
470      Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
471      try (Table table = connection.getTable(bulkLoadTableName);
472        ResultScanner scanner = table.getScanner(scan)) {
473        Result res = null;
474        while ((res = scanner.next()) != null) {
475          res.advance();
476          String fam = null;
477          String path = null;
478          boolean raw = false;
479          byte[] row;
480          String region = null;
481          for (Cell cell : res.listCells()) {
482            row = CellUtil.cloneRow(cell);
483            rows.add(row);
484            String rowStr = Bytes.toString(row);
485            region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
486            if (
487              CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
488                BackupSystemTable.FAM_COL.length) == 0
489            ) {
490              fam = Bytes.toString(CellUtil.cloneValue(cell));
491            } else if (
492              CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
493                BackupSystemTable.PATH_COL.length) == 0
494            ) {
495              path = Bytes.toString(CellUtil.cloneValue(cell));
496            } else if (
497              CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
498                BackupSystemTable.STATE_COL.length) == 0
499            ) {
500              byte[] state = CellUtil.cloneValue(cell);
501              if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
502                raw = true;
503              } else {
504                raw = false;
505              }
506            }
507          }
508          if (map.get(tTable) == null) {
509            map.put(tTable, new HashMap<>());
510            tblMap = map.get(tTable);
511          }
512          if (tblMap.get(region) == null) {
513            tblMap.put(region, new HashMap<>());
514          }
515          Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
516          if (famMap.get(fam) == null) {
517            famMap.put(fam, new ArrayList<>());
518          }
519          famMap.get(fam).add(new Pair<>(path, raw));
520          LOG.debug("found orig " + path + " for " + fam + " of table " + region);
521        }
522      }
523    }
524    return new Pair<>(map, rows);
525  }
526
527  /*
528   * @param sTableList List of tables
529   * @param maps array of Map of family to List of Paths
530   * @param backupId the backup Id
531   */
532  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
533    String backupId) throws IOException {
534    try (Table table = connection.getTable(bulkLoadTableName)) {
535      long ts = EnvironmentEdgeManager.currentTime();
536      int cnt = 0;
537      List<Put> puts = new ArrayList<>();
538      for (int idx = 0; idx < maps.length; idx++) {
539        Map<byte[], List<Path>> map = maps[idx];
540        TableName tn = sTableList.get(idx);
541
542        if (map == null) {
543          continue;
544        }
545
546        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
547          byte[] fam = entry.getKey();
548          List<Path> paths = entry.getValue();
549          for (Path p : paths) {
550            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId,
551              ts, cnt++);
552            puts.add(put);
553          }
554        }
555      }
556      if (!puts.isEmpty()) {
557        table.put(puts);
558      }
559    }
560  }
561
562  /**
563   * Reads backup status object (instance of backup info) from backup system table table
564   * @param backupId backup id
565   * @return Current status of backup session or null
566   */
567  public BackupInfo readBackupInfo(String backupId) throws IOException {
568    if (LOG.isTraceEnabled()) {
569      LOG.trace("read backup status from backup system table for: " + backupId);
570    }
571
572    try (Table table = connection.getTable(tableName)) {
573      Get get = createGetForBackupInfo(backupId);
574      Result res = table.get(get);
575      if (res.isEmpty()) {
576        return null;
577      }
578      return resultToBackupInfo(res);
579    }
580  }
581
582  /**
583   * Read the last backup start code (timestamp) of last successful backup. Will return null if
584   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
585   * there is no successful backup completed so far.
586   * @param backupRoot directory path to backup destination
587   * @return the timestamp of last successful backup
588   * @throws IOException exception
589   */
590  public String readBackupStartCode(String backupRoot) throws IOException {
591    LOG.trace("read backup start code from backup system table");
592
593    try (Table table = connection.getTable(tableName)) {
594      Get get = createGetForStartCode(backupRoot);
595      Result res = table.get(get);
596      if (res.isEmpty()) {
597        return null;
598      }
599      Cell cell = res.listCells().get(0);
600      byte[] val = CellUtil.cloneValue(cell);
601      if (val.length == 0) {
602        return null;
603      }
604      return new String(val, StandardCharsets.UTF_8);
605    }
606  }
607
608  /**
609   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
610   * @param startCode  start code
611   * @param backupRoot root directory path to backup
612   * @throws IOException exception
613   */
614  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
615    if (LOG.isTraceEnabled()) {
616      LOG.trace("write backup start code to backup system table " + startCode);
617    }
618    try (Table table = connection.getTable(tableName)) {
619      Put put = createPutForStartCode(startCode.toString(), backupRoot);
620      table.put(put);
621    }
622  }
623
624  /**
625   * Exclusive operations are: create, delete, merge
626   * @throws IOException if a table operation fails or an active backup exclusive operation is
627   *                     already underway
628   */
629  public void startBackupExclusiveOperation() throws IOException {
630    LOG.debug("Start new backup exclusive operation");
631
632    try (Table table = connection.getTable(tableName)) {
633      Put put = createPutForStartBackupSession();
634      // First try to put if row does not exist
635      if (
636        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
637          .ifNotExists().thenPut(put)
638      ) {
639        // Row exists, try to put if value == ACTIVE_SESSION_NO
640        if (
641          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
642            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
643        ) {
644          throw new ExclusiveOperationException();
645        }
646      }
647    }
648  }
649
650  private Put createPutForStartBackupSession() {
651    Put put = new Put(ACTIVE_SESSION_ROW);
652    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
653    return put;
654  }
655
656  public void finishBackupExclusiveOperation() throws IOException {
657    LOG.debug("Finish backup exclusive operation");
658
659    try (Table table = connection.getTable(tableName)) {
660      Put put = createPutForStopBackupSession();
661      if (
662        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
663          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
664      ) {
665        throw new IOException("There is no active backup exclusive operation");
666      }
667    }
668  }
669
670  private Put createPutForStopBackupSession() {
671    Put put = new Put(ACTIVE_SESSION_ROW);
672    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
673    return put;
674  }
675
676  /**
677   * Get the Region Servers log information after the last log roll from backup system table.
678   * @param backupRoot root directory path to backup
679   * @return RS log info
680   * @throws IOException exception
681   */
682  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
683    throws IOException {
684    LOG.trace("read region server last roll log result to backup system table");
685
686    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
687
688    try (Table table = connection.getTable(tableName);
689      ResultScanner scanner = table.getScanner(scan)) {
690      Result res;
691      HashMap<String, Long> rsTimestampMap = new HashMap<>();
692      while ((res = scanner.next()) != null) {
693        res.advance();
694        Cell cell = res.current();
695        byte[] row = CellUtil.cloneRow(cell);
696        String server = getServerNameForReadRegionServerLastLogRollResult(row);
697        byte[] data = CellUtil.cloneValue(cell);
698        rsTimestampMap.put(server, Bytes.toLong(data));
699      }
700      return rsTimestampMap;
701    }
702  }
703
704  /**
705   * Writes Region Server last roll log result (timestamp) to backup system table table
706   * @param server     Region Server name
707   * @param ts         last log timestamp
708   * @param backupRoot root directory path to backup
709   * @throws IOException exception
710   */
711  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
712    throws IOException {
713    LOG.trace("write region server last roll log result to backup system table");
714
715    try (Table table = connection.getTable(tableName)) {
716      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
717      table.put(put);
718    }
719  }
720
721  /**
722   * Get all completed backup information (in desc order by time)
723   * @param onlyCompleted true, if only successfully completed sessions
724   * @return history info of BackupCompleteData
725   * @throws IOException exception
726   */
727  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
728    LOG.trace("get backup history from backup system table");
729
730    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
731    ArrayList<BackupInfo> list = getBackupInfos(state);
732    return BackupUtils.sortHistoryListDesc(list);
733  }
734
735  /**
736   * Get all backups history
737   * @return list of backup info
738   * @throws IOException if getting the backup history fails
739   */
740  public List<BackupInfo> getBackupHistory() throws IOException {
741    return getBackupHistory(false);
742  }
743
744  /**
745   * Get first n backup history records
746   * @param n number of records, if n== -1 - max number is ignored
747   * @return list of records
748   * @throws IOException if getting the backup history fails
749   */
750  public List<BackupInfo> getHistory(int n) throws IOException {
751    List<BackupInfo> history = getBackupHistory();
752    if (n == -1 || history.size() <= n) {
753      return history;
754    }
755    return Collections.unmodifiableList(history.subList(0, n));
756  }
757
758  /**
759   * Get backup history records filtered by list of filters.
760   * @param n       max number of records, if n == -1 , then max number is ignored
761   * @param filters list of filters
762   * @return backup records
763   * @throws IOException if getting the backup history fails
764   */
765  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
766    if (filters.length == 0) {
767      return getHistory(n);
768    }
769
770    List<BackupInfo> history = getBackupHistory();
771    List<BackupInfo> result = new ArrayList<>();
772    for (BackupInfo bi : history) {
773      if (n >= 0 && result.size() == n) {
774        break;
775      }
776
777      boolean passed = true;
778      for (int i = 0; i < filters.length; i++) {
779        if (!filters[i].apply(bi)) {
780          passed = false;
781          break;
782        }
783      }
784      if (passed) {
785        result.add(bi);
786      }
787    }
788    return result;
789  }
790
791  /*
792   * Retrieve TableName's for completed backup of given type
793   * @param type backup type
794   * @return List of table names
795   */
796  public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
797    Set<TableName> names = new HashSet<>();
798    List<BackupInfo> infos = getBackupHistory(true);
799    for (BackupInfo info : infos) {
800      if (info.getType() == type) {
801        names.addAll(info.getTableNames());
802      }
803    }
804    return new ArrayList<>(names);
805  }
806
807  /**
808   * Get history for backup destination
809   * @param backupRoot backup destination path
810   * @return List of backup info
811   * @throws IOException if getting the backup history fails
812   */
813  public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
814    ArrayList<BackupInfo> history = getBackupHistory(false);
815    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
816      BackupInfo info = iterator.next();
817      if (!backupRoot.equals(info.getBackupRootDir())) {
818        iterator.remove();
819      }
820    }
821    return history;
822  }
823
824  /**
825   * Get history for a table
826   * @param name table name
827   * @return history for a table
828   * @throws IOException if getting the backup history fails
829   */
830  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
831    List<BackupInfo> history = getBackupHistory();
832    List<BackupInfo> tableHistory = new ArrayList<>();
833    for (BackupInfo info : history) {
834      List<TableName> tables = info.getTableNames();
835      if (tables.contains(name)) {
836        tableHistory.add(info);
837      }
838    }
839    return tableHistory;
840  }
841
842  public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
843    String backupRoot) throws IOException {
844    List<BackupInfo> history = getBackupHistory(backupRoot);
845    Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = new HashMap<>();
846    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
847      BackupInfo info = iterator.next();
848      if (!backupRoot.equals(info.getBackupRootDir())) {
849        continue;
850      }
851      List<TableName> tables = info.getTableNames();
852      for (TableName tableName : tables) {
853        if (set.contains(tableName)) {
854          ArrayList<BackupInfo> list = tableHistoryMap.get(tableName);
855          if (list == null) {
856            list = new ArrayList<>();
857            tableHistoryMap.put(tableName, list);
858          }
859          list.add(info);
860        }
861      }
862    }
863    return tableHistoryMap;
864  }
865
866  /**
867   * Get all backup sessions with a given state (in descending order by time)
868   * @param state backup session state
869   * @return history info of backup info objects
870   * @throws IOException exception
871   */
872  public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
873    LOG.trace("get backup infos from backup system table");
874
875    Scan scan = createScanForBackupHistory();
876    ArrayList<BackupInfo> list = new ArrayList<>();
877
878    try (Table table = connection.getTable(tableName);
879      ResultScanner scanner = table.getScanner(scan)) {
880      Result res;
881      while ((res = scanner.next()) != null) {
882        res.advance();
883        BackupInfo context = cellToBackupInfo(res.current());
884        if (state != BackupState.ANY && context.getState() != state) {
885          continue;
886        }
887        list.add(context);
888      }
889      return list;
890    }
891  }
892
893  /**
894   * Write the current timestamps for each regionserver to backup system table after a successful
895   * full or incremental backup. The saved timestamp is of the last log file that was backed up
896   * already.
897   * @param tables        tables
898   * @param newTimestamps timestamps
899   * @param backupRoot    root directory path to backup
900   * @throws IOException exception
901   */
902  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
903    String backupRoot) throws IOException {
904    if (LOG.isTraceEnabled()) {
905      LOG.trace("write RS log time stamps to backup system table for tables ["
906        + StringUtils.join(tables, ",") + "]");
907    }
908    List<Put> puts = new ArrayList<>();
909    for (TableName table : tables) {
910      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
911      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
912      puts.add(put);
913    }
914    try (Table table = connection.getTable(tableName)) {
915      table.put(puts);
916    }
917  }
918
919  /**
920   * Read the timestamp for each region server log after the last successful backup. Each table has
921   * its own set of the timestamps. The info is stored for each table as a concatenated string of
922   * rs->timestapmp
923   * @param backupRoot root directory path to backup
924   * @return the timestamp for each region server. key: tableName value:
925   *         RegionServer,PreviousTimeStamp
926   * @throws IOException exception
927   */
928  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
929    throws IOException {
930    if (LOG.isTraceEnabled()) {
931      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
932    }
933
934    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
935
936    Scan scan = createScanForReadLogTimestampMap(backupRoot);
937    try (Table table = connection.getTable(tableName);
938      ResultScanner scanner = table.getScanner(scan)) {
939      Result res;
940      while ((res = scanner.next()) != null) {
941        res.advance();
942        Cell cell = res.current();
943        byte[] row = CellUtil.cloneRow(cell);
944        String tabName = getTableNameForReadLogTimestampMap(row);
945        TableName tn = TableName.valueOf(tabName);
946        byte[] data = CellUtil.cloneValue(cell);
947        if (data == null) {
948          throw new IOException("Data of last backup data from backup system table "
949            + "is empty. Create a backup first.");
950        }
951        if (data != null && data.length > 0) {
952          HashMap<String, Long> lastBackup =
953            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
954          tableTimestampMap.put(tn, lastBackup);
955        }
956      }
957      return tableTimestampMap;
958    }
959  }
960
961  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
962    Map<String, Long> map) {
963    BackupProtos.TableServerTimestamp.Builder tstBuilder =
964      BackupProtos.TableServerTimestamp.newBuilder();
965    tstBuilder
966      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
967
968    for (Entry<String, Long> entry : map.entrySet()) {
969      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
970      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
971      ServerName sn = ServerName.parseServerName(entry.getKey());
972      snBuilder.setHostName(sn.getHostname());
973      snBuilder.setPort(sn.getPort());
974      builder.setServerName(snBuilder.build());
975      builder.setTimestamp(entry.getValue());
976      tstBuilder.addServerTimestamp(builder.build());
977    }
978
979    return tstBuilder.build();
980  }
981
982  private HashMap<String, Long>
983    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
984
985    HashMap<String, Long> map = new HashMap<>();
986    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
987    for (BackupProtos.ServerTimestamp st : list) {
988      ServerName sn =
989        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
990      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
991    }
992    return map;
993  }
994
995  /**
996   * Return the current tables covered by incremental backup.
997   * @param backupRoot root directory path to backup
998   * @return set of tableNames
999   * @throws IOException exception
1000   */
1001  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
1002    LOG.trace("get incremental backup table set from backup system table");
1003
1004    TreeSet<TableName> set = new TreeSet<>();
1005
1006    try (Table table = connection.getTable(tableName)) {
1007      Get get = createGetForIncrBackupTableSet(backupRoot);
1008      Result res = table.get(get);
1009      if (res.isEmpty()) {
1010        return set;
1011      }
1012      List<Cell> cells = res.listCells();
1013      for (Cell cell : cells) {
1014        // qualifier = table name - we use table names as qualifiers
1015        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
1016      }
1017      return set;
1018    }
1019  }
1020
1021  /**
1022   * Add tables to global incremental backup set
1023   * @param tables     set of tables
1024   * @param backupRoot root directory path to backup
1025   * @throws IOException exception
1026   */
1027  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
1028    throws IOException {
1029    if (LOG.isTraceEnabled()) {
1030      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
1031        + " tables [" + StringUtils.join(tables, " ") + "]");
1032    }
1033    if (LOG.isDebugEnabled()) {
1034      tables.forEach(table -> LOG.debug(Objects.toString(table)));
1035    }
1036    try (Table table = connection.getTable(tableName)) {
1037      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
1038      table.put(put);
1039    }
1040  }
1041
1042  /**
1043   * Deletes incremental backup set for a backup destination
1044   * @param backupRoot backup root
1045   */
1046  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
1047    if (LOG.isTraceEnabled()) {
1048      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
1049    }
1050    try (Table table = connection.getTable(tableName)) {
1051      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
1052      table.delete(delete);
1053    }
1054  }
1055
1056  /**
1057   * Checks if we have at least one backup session in backup system table This API is used by
1058   * BackupLogCleaner
1059   * @return true, if - at least one session exists in backup system table table
1060   * @throws IOException exception
1061   */
1062  public boolean hasBackupSessions() throws IOException {
1063    LOG.trace("Has backup sessions from backup system table");
1064
1065    boolean result = false;
1066    Scan scan = createScanForBackupHistory();
1067    scan.setCaching(1);
1068    try (Table table = connection.getTable(tableName);
1069      ResultScanner scanner = table.getScanner(scan)) {
1070      if (scanner.next() != null) {
1071        result = true;
1072      }
1073      return result;
1074    }
1075  }
1076
1077  /**
1078   * BACKUP SETS
1079   */
1080
1081  /**
1082   * Get backup set list
1083   * @return backup set list
1084   * @throws IOException if a table or scanner operation fails
1085   */
1086  public List<String> listBackupSets() throws IOException {
1087    LOG.trace("Backup set list");
1088
1089    List<String> list = new ArrayList<>();
1090    try (Table table = connection.getTable(tableName)) {
1091      Scan scan = createScanForBackupSetList();
1092      scan.readVersions(1);
1093      try (ResultScanner scanner = table.getScanner(scan)) {
1094        Result res;
1095        while ((res = scanner.next()) != null) {
1096          res.advance();
1097          list.add(cellKeyToBackupSetName(res.current()));
1098        }
1099        return list;
1100      }
1101    }
1102  }
1103
1104  /**
1105   * Get backup set description (list of tables)
1106   * @param name set's name
1107   * @return list of tables in a backup set
1108   * @throws IOException if a table operation fails
1109   */
1110  public List<TableName> describeBackupSet(String name) throws IOException {
1111    if (LOG.isTraceEnabled()) {
1112      LOG.trace(" Backup set describe: " + name);
1113    }
1114    try (Table table = connection.getTable(tableName)) {
1115      Get get = createGetForBackupSet(name);
1116      Result res = table.get(get);
1117      if (res.isEmpty()) {
1118        return null;
1119      }
1120      res.advance();
1121      String[] tables = cellValueToBackupSet(res.current());
1122      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
1123        .collect(Collectors.toList());
1124    }
1125  }
1126
1127  /**
1128   * Add backup set (list of tables)
1129   * @param name      set name
1130   * @param newTables list of tables, comma-separated
1131   * @throws IOException if a table operation fails
1132   */
1133  public void addToBackupSet(String name, String[] newTables) throws IOException {
1134    if (LOG.isTraceEnabled()) {
1135      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
1136    }
1137    String[] union = null;
1138    try (Table table = connection.getTable(tableName)) {
1139      Get get = createGetForBackupSet(name);
1140      Result res = table.get(get);
1141      if (res.isEmpty()) {
1142        union = newTables;
1143      } else {
1144        res.advance();
1145        String[] tables = cellValueToBackupSet(res.current());
1146        union = merge(tables, newTables);
1147      }
1148      Put put = createPutForBackupSet(name, union);
1149      table.put(put);
1150    }
1151  }
1152
1153  /**
1154   * Remove tables from backup set (list of tables)
1155   * @param name     set name
1156   * @param toRemove list of tables
1157   * @throws IOException if a table operation or deleting the backup set fails
1158   */
1159  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
1160    if (LOG.isTraceEnabled()) {
1161      LOG.trace(
1162        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
1163    }
1164    String[] disjoint;
1165    String[] tables;
1166    try (Table table = connection.getTable(tableName)) {
1167      Get get = createGetForBackupSet(name);
1168      Result res = table.get(get);
1169      if (res.isEmpty()) {
1170        LOG.warn("Backup set '" + name + "' not found.");
1171        return;
1172      } else {
1173        res.advance();
1174        tables = cellValueToBackupSet(res.current());
1175        disjoint = disjoin(tables, toRemove);
1176      }
1177      if (disjoint.length > 0 && disjoint.length != tables.length) {
1178        Put put = createPutForBackupSet(name, disjoint);
1179        table.put(put);
1180      } else if (disjoint.length == tables.length) {
1181        LOG.warn("Backup set '" + name + "' does not contain tables ["
1182          + StringUtils.join(toRemove, " ") + "]");
1183      } else { // disjoint.length == 0 and tables.length >0
1184        // Delete backup set
1185        LOG.info("Backup set '" + name + "' is empty. Deleting.");
1186        deleteBackupSet(name);
1187      }
1188    }
1189  }
1190
1191  private String[] merge(String[] existingTables, String[] newTables) {
1192    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1193    tables.addAll(Arrays.asList(newTables));
1194    return tables.toArray(new String[0]);
1195  }
1196
1197  private String[] disjoin(String[] existingTables, String[] toRemove) {
1198    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1199    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
1200    return tables.toArray(new String[0]);
1201  }
1202
1203  /**
1204   * Delete backup set
1205   * @param name set's name
1206   * @throws IOException if getting or deleting the table fails
1207   */
1208  public void deleteBackupSet(String name) throws IOException {
1209    if (LOG.isTraceEnabled()) {
1210      LOG.trace(" Backup set delete: " + name);
1211    }
1212    try (Table table = connection.getTable(tableName)) {
1213      Delete del = createDeleteForBackupSet(name);
1214      table.delete(del);
1215    }
1216  }
1217
1218  /**
1219   * Get backup system table descriptor
1220   * @return table's descriptor
1221   */
1222  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1223    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1224
1225    ColumnFamilyDescriptorBuilder colBuilder =
1226      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1227
1228    colBuilder.setMaxVersions(1);
1229    Configuration config = HBaseConfiguration.create();
1230    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1231      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1232    colBuilder.setTimeToLive(ttl);
1233
1234    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1235    builder.setColumnFamily(colSessionsDesc);
1236
1237    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1238    colBuilder.setTimeToLive(ttl);
1239    builder.setColumnFamily(colBuilder.build());
1240    return builder.build();
1241  }
1242
1243  public static TableName getTableName(Configuration conf) {
1244    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1245      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1246    return TableName.valueOf(name);
1247  }
1248
1249  public static String getTableNameAsString(Configuration conf) {
1250    return getTableName(conf).getNameAsString();
1251  }
1252
1253  public static String getSnapshotName(Configuration conf) {
1254    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1255  }
1256
1257  /**
1258   * Get backup system table descriptor
1259   * @return table's descriptor
1260   */
1261  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1262    TableDescriptorBuilder builder =
1263      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1264
1265    ColumnFamilyDescriptorBuilder colBuilder =
1266      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1267    colBuilder.setMaxVersions(1);
1268    Configuration config = HBaseConfiguration.create();
1269    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1270      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1271    colBuilder.setTimeToLive(ttl);
1272    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1273    builder.setColumnFamily(colSessionsDesc);
1274    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1275    colBuilder.setTimeToLive(ttl);
1276    builder.setColumnFamily(colBuilder.build());
1277    return builder.build();
1278  }
1279
1280  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1281    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1282      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1283    return TableName.valueOf(name);
1284  }
1285
1286  /**
1287   * Creates Put operation for a given backup info object
1288   * @param context backup info
1289   * @return put operation
1290   * @throws IOException exception
1291   */
1292  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1293    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1294    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1295      context.toByteArray());
1296    return put;
1297  }
1298
1299  /**
1300   * Creates Get operation for a given backup id
1301   * @param backupId backup's ID
1302   * @return get operation
1303   * @throws IOException exception
1304   */
1305  private Get createGetForBackupInfo(String backupId) throws IOException {
1306    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1307    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1308    get.readVersions(1);
1309    return get;
1310  }
1311
1312  /**
1313   * Creates Delete operation for a given backup id
1314   * @param backupId backup's ID
1315   * @return delete operation
1316   */
1317  private Delete createDeleteForBackupInfo(String backupId) {
1318    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1319    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1320    return del;
1321  }
1322
1323  /**
1324   * Converts Result to BackupInfo
1325   * @param res HBase result
1326   * @return backup info instance
1327   * @throws IOException exception
1328   */
1329  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1330    res.advance();
1331    Cell cell = res.current();
1332    return cellToBackupInfo(cell);
1333  }
1334
1335  /**
1336   * Creates Get operation to retrieve start code from backup system table
1337   * @return get operation
1338   * @throws IOException exception
1339   */
1340  private Get createGetForStartCode(String rootPath) throws IOException {
1341    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1342    get.addFamily(BackupSystemTable.META_FAMILY);
1343    get.readVersions(1);
1344    return get;
1345  }
1346
1347  /**
1348   * Creates Put operation to store start code to backup system table
1349   * @return put operation
1350   */
1351  private Put createPutForStartCode(String startCode, String rootPath) {
1352    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1353    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1354      Bytes.toBytes(startCode));
1355    return put;
1356  }
1357
1358  /**
1359   * Creates Get to retrieve incremental backup table set from backup system table
1360   * @return get operation
1361   * @throws IOException exception
1362   */
1363  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1364    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1365    get.addFamily(BackupSystemTable.META_FAMILY);
1366    get.readVersions(1);
1367    return get;
1368  }
1369
1370  /**
1371   * Creates Put to store incremental backup table set
1372   * @param tables tables
1373   * @return put operation
1374   */
1375  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1376    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1377    for (TableName table : tables) {
1378      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1379        EMPTY_VALUE);
1380    }
1381    return put;
1382  }
1383
1384  /**
1385   * Creates Delete for incremental backup table set
1386   * @param backupRoot backup root
1387   * @return delete operation
1388   */
1389  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1390    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1391    delete.addFamily(BackupSystemTable.META_FAMILY);
1392    return delete;
1393  }
1394
1395  /**
1396   * Creates Scan operation to load backup history
1397   * @return scan operation
1398   */
1399  private Scan createScanForBackupHistory() {
1400    Scan scan = new Scan();
1401    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1402    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1403    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1404    scan.withStartRow(startRow);
1405    scan.withStopRow(stopRow);
1406    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1407    scan.readVersions(1);
1408    return scan;
1409  }
1410
1411  /**
1412   * Converts cell to backup info instance.
1413   * @param current current cell
1414   * @return backup backup info instance
1415   * @throws IOException exception
1416   */
1417  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1418    byte[] data = CellUtil.cloneValue(current);
1419    return BackupInfo.fromByteArray(data);
1420  }
1421
1422  /**
1423   * Creates Put to write RS last roll log timestamp map
1424   * @param table table
1425   * @param smap  map, containing RS:ts
1426   * @return put operation
1427   */
1428  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1429    String backupRoot) {
1430    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1431    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1432    return put;
1433  }
1434
1435  /**
1436   * Creates Scan to load table-> { RS -> ts} map of maps
1437   * @return scan operation
1438   */
1439  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1440    Scan scan = new Scan();
1441    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1442    scan.addFamily(BackupSystemTable.META_FAMILY);
1443
1444    return scan;
1445  }
1446
1447  /**
1448   * Get table name from rowkey
1449   * @param cloneRow rowkey
1450   * @return table name
1451   */
1452  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1453    String s = Bytes.toString(cloneRow);
1454    int index = s.lastIndexOf(NULL);
1455    return s.substring(index + 1);
1456  }
1457
1458  /**
1459   * Creates Put to store RS last log result
1460   * @param server    server name
1461   * @param timestamp log roll result (timestamp)
1462   * @return put operation
1463   */
1464  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1465    String backupRoot) {
1466    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1467    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1468      Bytes.toBytes(timestamp));
1469    return put;
1470  }
1471
1472  /**
1473   * Creates Scan operation to load last RS log roll results
1474   * @return scan operation
1475   */
1476  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1477    Scan scan = new Scan();
1478    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1479    scan.addFamily(BackupSystemTable.META_FAMILY);
1480    scan.readVersions(1);
1481
1482    return scan;
1483  }
1484
1485  /**
1486   * Get server's name from rowkey
1487   * @param row rowkey
1488   * @return server's name
1489   */
1490  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1491    String s = Bytes.toString(row);
1492    int index = s.lastIndexOf(NULL);
1493    return s.substring(index + 1);
1494  }
1495
1496  /*
1497   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1498   */
1499  static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
1500    Map<byte[], List<Path>> finalPaths) {
1501    List<Put> puts = new ArrayList<>();
1502    for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
1503      for (Path path : entry.getValue()) {
1504        String file = path.toString();
1505        int lastSlash = file.lastIndexOf("/");
1506        String filename = file.substring(lastSlash + 1);
1507        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1508          Bytes.toString(region), BLK_LD_DELIM, filename));
1509        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1510        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1511        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1512        put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
1513        puts.add(put);
1514        LOG
1515          .debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
1516      }
1517    }
1518    return puts;
1519  }
1520
1521  public static void snapshot(Connection conn) throws IOException {
1522    try (Admin admin = conn.getAdmin()) {
1523      Configuration conf = conn.getConfiguration();
1524      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1525    }
1526  }
1527
1528  public static void restoreFromSnapshot(Connection conn) throws IOException {
1529    Configuration conf = conn.getConfiguration();
1530    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1531    try (Admin admin = conn.getAdmin()) {
1532      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1533      if (snapshotExists(admin, snapshotName)) {
1534        admin.disableTable(BackupSystemTable.getTableName(conf));
1535        admin.restoreSnapshot(snapshotName);
1536        admin.enableTable(BackupSystemTable.getTableName(conf));
1537        LOG.debug("Done restoring backup system table");
1538      } else {
1539        // Snapshot does not exists, i.e completeBackup failed after
1540        // deleting backup system table snapshot
1541        // In this case we log WARN and proceed
1542        LOG.warn(
1543          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1544      }
1545    }
1546  }
1547
1548  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1549    List<SnapshotDescription> list = admin.listSnapshots();
1550    for (SnapshotDescription desc : list) {
1551      if (desc.getName().equals(snapshotName)) {
1552        return true;
1553      }
1554    }
1555    return false;
1556  }
1557
1558  public static boolean snapshotExists(Connection conn) throws IOException {
1559    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1560  }
1561
1562  public static void deleteSnapshot(Connection conn) throws IOException {
1563    Configuration conf = conn.getConfiguration();
1564    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1565    try (Admin admin = conn.getAdmin()) {
1566      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1567      if (snapshotExists(admin, snapshotName)) {
1568        admin.deleteSnapshot(snapshotName);
1569        LOG.debug("Done deleting backup system table snapshot");
1570      } else {
1571        LOG.error("Snapshot " + snapshotName + " does not exists");
1572      }
1573    }
1574  }
1575
1576  /*
1577   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1578   */
1579  static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family,
1580    final List<Pair<Path, Path>> pairs) {
1581    List<Put> puts = new ArrayList<>(pairs.size());
1582    for (Pair<Path, Path> pair : pairs) {
1583      Path path = pair.getSecond();
1584      String file = path.toString();
1585      int lastSlash = file.lastIndexOf("/");
1586      String filename = file.substring(lastSlash + 1);
1587      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1588        Bytes.toString(region), BLK_LD_DELIM, filename));
1589      put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1590      put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
1591      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1592      put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
1593      puts.add(put);
1594      LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
1595    }
1596    return puts;
1597  }
1598
1599  public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
1600    List<Delete> lstDels = new ArrayList<>(lst.size());
1601    for (TableName table : lst) {
1602      Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
1603      del.addFamily(BackupSystemTable.META_FAMILY);
1604      lstDels.add(del);
1605    }
1606    return lstDels;
1607  }
1608
1609  private Put createPutForDeleteOperation(String[] backupIdList) {
1610    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1611    Put put = new Put(DELETE_OP_ROW);
1612    put.addColumn(META_FAMILY, FAM_COL, value);
1613    return put;
1614  }
1615
1616  private Delete createDeleteForBackupDeleteOperation() {
1617    Delete delete = new Delete(DELETE_OP_ROW);
1618    delete.addFamily(META_FAMILY);
1619    return delete;
1620  }
1621
1622  private Get createGetForDeleteOperation() {
1623    Get get = new Get(DELETE_OP_ROW);
1624    get.addFamily(META_FAMILY);
1625    return get;
1626  }
1627
1628  public void startDeleteOperation(String[] backupIdList) throws IOException {
1629    if (LOG.isTraceEnabled()) {
1630      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1631    }
1632    Put put = createPutForDeleteOperation(backupIdList);
1633    try (Table table = connection.getTable(tableName)) {
1634      table.put(put);
1635    }
1636  }
1637
1638  public void finishDeleteOperation() throws IOException {
1639    LOG.trace("Finsih delete operation for backup ids");
1640
1641    Delete delete = createDeleteForBackupDeleteOperation();
1642    try (Table table = connection.getTable(tableName)) {
1643      table.delete(delete);
1644    }
1645  }
1646
1647  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1648    LOG.trace("Get delete operation for backup ids");
1649
1650    Get get = createGetForDeleteOperation();
1651    try (Table table = connection.getTable(tableName)) {
1652      Result res = table.get(get);
1653      if (res.isEmpty()) {
1654        return null;
1655      }
1656      Cell cell = res.listCells().get(0);
1657      byte[] val = CellUtil.cloneValue(cell);
1658      if (val.length == 0) {
1659        return null;
1660      }
1661      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1662        .toArray(String[]::new);
1663    }
1664  }
1665
1666  private Put createPutForMergeOperation(String[] backupIdList) {
1667    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1668    Put put = new Put(MERGE_OP_ROW);
1669    put.addColumn(META_FAMILY, FAM_COL, value);
1670    return put;
1671  }
1672
1673  public boolean isMergeInProgress() throws IOException {
1674    Get get = new Get(MERGE_OP_ROW);
1675    try (Table table = connection.getTable(tableName)) {
1676      Result res = table.get(get);
1677      return !res.isEmpty();
1678    }
1679  }
1680
1681  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1682    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1683    Put put = new Put(MERGE_OP_ROW);
1684    put.addColumn(META_FAMILY, PATH_COL, value);
1685    return put;
1686  }
1687
1688  private Delete createDeleteForBackupMergeOperation() {
1689    Delete delete = new Delete(MERGE_OP_ROW);
1690    delete.addFamily(META_FAMILY);
1691    return delete;
1692  }
1693
1694  private Get createGetForMergeOperation() {
1695    Get get = new Get(MERGE_OP_ROW);
1696    get.addFamily(META_FAMILY);
1697    return get;
1698  }
1699
1700  public void startMergeOperation(String[] backupIdList) throws IOException {
1701    if (LOG.isTraceEnabled()) {
1702      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1703    }
1704    Put put = createPutForMergeOperation(backupIdList);
1705    try (Table table = connection.getTable(tableName)) {
1706      table.put(put);
1707    }
1708  }
1709
1710  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1711    if (LOG.isTraceEnabled()) {
1712      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1713    }
1714    Put put = createPutForUpdateTablesForMerge(tables);
1715    try (Table table = connection.getTable(tableName)) {
1716      table.put(put);
1717    }
1718  }
1719
1720  public void finishMergeOperation() throws IOException {
1721    LOG.trace("Finish merge operation for backup ids");
1722
1723    Delete delete = createDeleteForBackupMergeOperation();
1724    try (Table table = connection.getTable(tableName)) {
1725      table.delete(delete);
1726    }
1727  }
1728
1729  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1730    LOG.trace("Get backup ids for merge operation");
1731
1732    Get get = createGetForMergeOperation();
1733    try (Table table = connection.getTable(tableName)) {
1734      Result res = table.get(get);
1735      if (res.isEmpty()) {
1736        return null;
1737      }
1738      Cell cell = res.listCells().get(0);
1739      byte[] val = CellUtil.cloneValue(cell);
1740      if (val.length == 0) {
1741        return null;
1742      }
1743      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1744        .toArray(String[]::new);
1745    }
1746  }
1747
1748  static Scan createScanForOrigBulkLoadedFiles(TableName table) {
1749    Scan scan = new Scan();
1750    byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1751    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1752    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1753    scan.withStartRow(startRow);
1754    scan.withStopRow(stopRow);
1755    scan.addFamily(BackupSystemTable.META_FAMILY);
1756    scan.readVersions(1);
1757    return scan;
1758  }
1759
1760  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1761    // format is bulk : namespace : table : region : file
1762    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1763  }
1764
1765  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1766    // format is bulk : namespace : table : region : file
1767    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1768    Iterator<String> i = parts.iterator();
1769    int idx = 3;
1770    if (parts.size() == 4) {
1771      // the table is in default namespace
1772      idx = 2;
1773    }
1774    String region = Iterators.get(i, idx);
1775    LOG.debug("bulk row string " + rowStr + " region " + region);
1776    return region;
1777  }
1778
1779  /*
1780   * Used to query bulk loaded hfiles which have been copied by incremental backup
1781   * @param backupId the backup Id. It can be null when querying for all tables
1782   * @return the Scan object
1783   */
1784  static Scan createScanForBulkLoadedFiles(String backupId) {
1785    Scan scan = new Scan();
1786    byte[] startRow =
1787      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1788    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1789    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1790    scan.withStartRow(startRow);
1791    scan.withStopRow(stopRow);
1792    scan.addFamily(BackupSystemTable.META_FAMILY);
1793    scan.readVersions(1);
1794    return scan;
1795  }
1796
1797  static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
1798    long ts, int idx) {
1799    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
1800    put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
1801    put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
1802    put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p));
1803    return put;
1804  }
1805
1806  /**
1807   * Creates Scan operation to load backup set list
1808   * @return scan operation
1809   */
1810  private Scan createScanForBackupSetList() {
1811    Scan scan = new Scan();
1812    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1813    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1814    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1815    scan.withStartRow(startRow);
1816    scan.withStopRow(stopRow);
1817    scan.addFamily(BackupSystemTable.META_FAMILY);
1818    return scan;
1819  }
1820
1821  /**
1822   * Creates Get operation to load backup set content
1823   * @return get operation
1824   */
1825  private Get createGetForBackupSet(String name) {
1826    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1827    get.addFamily(BackupSystemTable.META_FAMILY);
1828    return get;
1829  }
1830
1831  /**
1832   * Creates Delete operation to delete backup set content
1833   * @param name backup set's name
1834   * @return delete operation
1835   */
1836  private Delete createDeleteForBackupSet(String name) {
1837    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1838    del.addFamily(BackupSystemTable.META_FAMILY);
1839    return del;
1840  }
1841
1842  /**
1843   * Creates Put operation to update backup set content
1844   * @param name   backup set's name
1845   * @param tables list of tables
1846   * @return put operation
1847   */
1848  private Put createPutForBackupSet(String name, String[] tables) {
1849    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1850    byte[] value = convertToByteArray(tables);
1851    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1852    return put;
1853  }
1854
1855  private byte[] convertToByteArray(String[] tables) {
1856    return Bytes.toBytes(StringUtils.join(tables, ","));
1857  }
1858
1859  /**
1860   * Converts cell to backup set list.
1861   * @param current current cell
1862   * @return backup set as array of table names
1863   */
1864  private String[] cellValueToBackupSet(Cell current) {
1865    byte[] data = CellUtil.cloneValue(current);
1866    if (!ArrayUtils.isEmpty(data)) {
1867      return Bytes.toString(data).split(",");
1868    }
1869    return new String[0];
1870  }
1871
1872  /**
1873   * Converts cell key to backup set name.
1874   * @param current current cell
1875   * @return backup set name
1876   */
1877  private String cellKeyToBackupSetName(Cell current) {
1878    byte[] data = CellUtil.cloneRow(current);
1879    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1880  }
1881
1882  private static byte[] rowkey(String s, String... other) {
1883    StringBuilder sb = new StringBuilder(s);
1884    for (String ss : other) {
1885      sb.append(ss);
1886    }
1887    return Bytes.toBytes(sb.toString());
1888  }
1889}