001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.nio.charset.StandardCharsets;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.Objects;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.ArrayUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.NamespaceDescriptor;
046import org.apache.hadoop.hbase.NamespaceExistException;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableExistsException;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.TableNotDisabledException;
051import org.apache.hadoop.hbase.backup.BackupInfo;
052import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
053import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
054import org.apache.hadoop.hbase.backup.BackupType;
055import org.apache.hadoop.hbase.backup.util.BackupUtils;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.Delete;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.SnapshotDescription;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.hbase.util.Pair;
074import org.apache.yetus.audience.InterfaceAudience;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
079import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
080
081import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
083
084/**
085 * This class provides API to access backup system table<br>
086 * Backup system table schema:<br>
087 * <p>
088 * <ul>
089 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
090 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
091 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of
092 * include table; value=empty</li>
093 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
094 * timestamp]</li>
095 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
096 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
097 * name</li>
098 * </ul>
099 * </p>
100 */
101@InterfaceAudience.Private
102public final class BackupSystemTable implements Closeable {
103
104  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
105
106  static class WALItem {
107    String backupId;
108    String walFile;
109    String backupRoot;
110
111    WALItem(String backupId, String walFile, String backupRoot) {
112      this.backupId = backupId;
113      this.walFile = walFile;
114      this.backupRoot = backupRoot;
115    }
116
117    public String getBackupId() {
118      return backupId;
119    }
120
121    public String getWalFile() {
122      return walFile;
123    }
124
125    public String getBackupRoot() {
126      return backupRoot;
127    }
128
129    @Override
130    public String toString() {
131      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
132    }
133  }
134
135  /**
136   * Backup system table (main) name
137   */
138  private TableName tableName;
139
140  /**
141   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
142   * separate table because we have to isolate general backup operations: create, merge etc from
143   * activity of RegionObserver, which controls process of a bulk loading
144   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
145   */
146  private TableName bulkLoadTableName;
147
148  /**
149   * Stores backup sessions (contexts)
150   */
151  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
152  /**
153   * Stores other meta
154   */
155  final static byte[] META_FAMILY = Bytes.toBytes("meta");
156  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
157  /**
158   * Connection to HBase cluster, shared among all instances
159   */
160  private final Connection connection;
161
162  private final static String BACKUP_INFO_PREFIX = "session:";
163  private final static String START_CODE_ROW = "startcode:";
164  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
165  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
166
167  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
168  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
169
170  private final static String INCR_BACKUP_SET = "incrbackupset:";
171  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
172  private final static String RS_LOG_TS_PREFIX = "rslogts:";
173
174  private final static String BULK_LOAD_PREFIX = "bulk:";
175  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
176  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
177  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
178
179  final static byte[] TBL_COL = Bytes.toBytes("tbl");
180  final static byte[] FAM_COL = Bytes.toBytes("fam");
181  final static byte[] PATH_COL = Bytes.toBytes("path");
182  final static byte[] STATE_COL = Bytes.toBytes("state");
183  // the two states a bulk loaded file can be
184  final static byte[] BL_PREPARE = Bytes.toBytes("R");
185  final static byte[] BL_COMMIT = Bytes.toBytes("D");
186
187  private final static String SET_KEY_PREFIX = "backupset:";
188
189  // separator between BULK_LOAD_PREFIX and ordinals
190  private final static String BLK_LD_DELIM = ":";
191  private final static byte[] EMPTY_VALUE = new byte[] {};
192
193  // Safe delimiter in a string
194  private final static String NULL = "\u0000";
195
196  public BackupSystemTable(Connection conn) throws IOException {
197    this.connection = conn;
198    Configuration conf = this.connection.getConfiguration();
199    tableName = BackupSystemTable.getTableName(conf);
200    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
201    checkSystemTable();
202  }
203
204  private void checkSystemTable() throws IOException {
205    try (Admin admin = connection.getAdmin()) {
206      verifyNamespaceExists(admin);
207      Configuration conf = connection.getConfiguration();
208      if (!admin.tableExists(tableName)) {
209        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
210        createSystemTable(admin, backupHTD);
211      }
212      ensureTableEnabled(admin, tableName);
213      if (!admin.tableExists(bulkLoadTableName)) {
214        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
215        createSystemTable(admin, blHTD);
216      }
217      ensureTableEnabled(admin, bulkLoadTableName);
218      waitForSystemTable(admin, tableName);
219      waitForSystemTable(admin, bulkLoadTableName);
220    }
221  }
222
223  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
224    try {
225      admin.createTable(descriptor);
226    } catch (TableExistsException e) {
227      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
228      // so may be subject to race conditions where one caller succeeds in creating the
229      // table and others fail because it now exists
230      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
231    }
232  }
233
234  private void verifyNamespaceExists(Admin admin) throws IOException {
235    String namespaceName = tableName.getNamespaceAsString();
236    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
237    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
238    boolean exists = false;
239    for (NamespaceDescriptor nsd : list) {
240      if (nsd.getName().equals(ns.getName())) {
241        exists = true;
242        break;
243      }
244    }
245    if (!exists) {
246      try {
247        admin.createNamespace(ns);
248      } catch (NamespaceExistException e) {
249        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
250        // so may be subject to race conditions where one caller succeeds in creating the
251        // namespace and others fail because it now exists
252        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
253      }
254    }
255  }
256
257  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
258    // Return fast if the table is available and avoid a log message
259    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
260      return;
261    }
262    long TIMEOUT = 60000;
263    long startTime = EnvironmentEdgeManager.currentTime();
264    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
265      tableName);
266    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
267      try {
268        Thread.sleep(100);
269      } catch (InterruptedException e) {
270        throw (IOException) new InterruptedIOException().initCause(e);
271      }
272      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
273        throw new IOException(
274          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
275      }
276    }
277    LOG.debug("Backup table {} exists and available", tableName);
278  }
279
280  @Override
281  public void close() {
282    // do nothing
283  }
284
285  /**
286   * Updates status (state) of a backup session in backup system table table
287   * @param info backup info
288   * @throws IOException exception
289   */
290  public void updateBackupInfo(BackupInfo info) throws IOException {
291    if (LOG.isTraceEnabled()) {
292      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
293        + " set status=" + info.getState());
294    }
295    try (Table table = connection.getTable(tableName)) {
296      Put put = createPutForBackupInfo(info);
297      table.put(put);
298    }
299  }
300
301  /*
302   * @param backupId the backup Id
303   * @return Map of rows to path of bulk loaded hfile
304   */
305  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
306    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
307    try (Table table = connection.getTable(bulkLoadTableName);
308      ResultScanner scanner = table.getScanner(scan)) {
309      Result res = null;
310      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
311      while ((res = scanner.next()) != null) {
312        res.advance();
313        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
314        for (Cell cell : res.listCells()) {
315          if (
316            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
317              BackupSystemTable.PATH_COL.length) == 0
318          ) {
319            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
320          }
321        }
322      }
323      return map;
324    }
325  }
326
327  /*
328   * Used during restore
329   * @param backupId the backup Id
330   * @param sTableList List of tables
331   * @return array of Map of family to List of Paths
332   */
333  public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
334    throws IOException {
335    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
336    @SuppressWarnings("unchecked")
337    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
338    try (Table table = connection.getTable(bulkLoadTableName);
339      ResultScanner scanner = table.getScanner(scan)) {
340      Result res = null;
341      while ((res = scanner.next()) != null) {
342        res.advance();
343        TableName tbl = null;
344        byte[] fam = null;
345        String path = null;
346        for (Cell cell : res.listCells()) {
347          if (
348            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
349              BackupSystemTable.TBL_COL.length) == 0
350          ) {
351            tbl = TableName.valueOf(CellUtil.cloneValue(cell));
352          } else if (
353            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
354              BackupSystemTable.FAM_COL.length) == 0
355          ) {
356            fam = CellUtil.cloneValue(cell);
357          } else if (
358            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
359              BackupSystemTable.PATH_COL.length) == 0
360          ) {
361            path = Bytes.toString(CellUtil.cloneValue(cell));
362          }
363        }
364        int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
365        if (srcIdx == -1) {
366          // the table is not among the query
367          continue;
368        }
369        if (mapForSrc[srcIdx] == null) {
370          mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
371        }
372        List<Path> files;
373        if (!mapForSrc[srcIdx].containsKey(fam)) {
374          files = new ArrayList<Path>();
375          mapForSrc[srcIdx].put(fam, files);
376        } else {
377          files = mapForSrc[srcIdx].get(fam);
378        }
379        files.add(new Path(path));
380        if (LOG.isDebugEnabled()) {
381          LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
382        }
383      }
384
385      return mapForSrc;
386    }
387  }
388
389  /**
390   * Deletes backup status from backup system table table
391   * @param backupId backup id
392   * @throws IOException exception
393   */
394  public void deleteBackupInfo(String backupId) throws IOException {
395    if (LOG.isTraceEnabled()) {
396      LOG.trace("delete backup status in backup system table for " + backupId);
397    }
398    try (Table table = connection.getTable(tableName)) {
399      Delete del = createDeleteForBackupInfo(backupId);
400      table.delete(del);
401    }
402  }
403
404  /*
405   * For postBulkLoadHFile() hook.
406   * @param tabName table name
407   * @param region the region receiving hfile
408   * @param finalPaths family and associated hfiles
409   */
410  public void writePathsPostBulkLoad(TableName tabName, byte[] region,
411    Map<byte[], List<Path>> finalPaths) throws IOException {
412    if (LOG.isDebugEnabled()) {
413      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
414        + " entries");
415    }
416    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
417      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
418      bufferedMutator.mutate(puts);
419      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
420    }
421  }
422
423  /*
424   * For preCommitStoreFile() hook
425   * @param tabName table name
426   * @param region the region receiving hfile
427   * @param family column family
428   * @param pairs list of paths for hfiles
429   */
430  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
431    final List<Pair<Path, Path>> pairs) throws IOException {
432    if (LOG.isDebugEnabled()) {
433      LOG.debug(
434        "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
435    }
436    try (Table table = connection.getTable(bulkLoadTableName)) {
437      List<Put> puts =
438        BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
439      table.put(puts);
440      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
441    }
442  }
443
444  /*
445   * Removes rows recording bulk loaded hfiles from backup table
446   * @param lst list of table names
447   * @param rows the rows to be deleted
448   */
449  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
450    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
451      List<Delete> lstDels = new ArrayList<>();
452      for (byte[] row : rows) {
453        Delete del = new Delete(row);
454        lstDels.add(del);
455        LOG.debug("orig deleting the row: " + Bytes.toString(row));
456      }
457      bufferedMutator.mutate(lstDels);
458      LOG.debug("deleted " + rows.size() + " original bulkload rows");
459    }
460  }
461
462  /*
463   * Reads the rows from backup table recording bulk loaded hfiles
464   * @param tableList list of table names
465   * @return The keys of the Map are table, region and column family. Value of the map reflects
466   * whether the hfile was recorded by preCommitStoreFile hook (true)
467   */
468  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
469    readBulkloadRows(List<TableName> tableList) throws IOException {
470
471    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
472    List<byte[]> rows = new ArrayList<>();
473    for (TableName tTable : tableList) {
474      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
475      Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
476      try (Table table = connection.getTable(bulkLoadTableName);
477        ResultScanner scanner = table.getScanner(scan)) {
478        Result res = null;
479        while ((res = scanner.next()) != null) {
480          res.advance();
481          String fam = null;
482          String path = null;
483          boolean raw = false;
484          byte[] row;
485          String region = null;
486          for (Cell cell : res.listCells()) {
487            row = CellUtil.cloneRow(cell);
488            rows.add(row);
489            String rowStr = Bytes.toString(row);
490            region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
491            if (
492              CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
493                BackupSystemTable.FAM_COL.length) == 0
494            ) {
495              fam = Bytes.toString(CellUtil.cloneValue(cell));
496            } else if (
497              CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
498                BackupSystemTable.PATH_COL.length) == 0
499            ) {
500              path = Bytes.toString(CellUtil.cloneValue(cell));
501            } else if (
502              CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
503                BackupSystemTable.STATE_COL.length) == 0
504            ) {
505              byte[] state = CellUtil.cloneValue(cell);
506              if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
507                raw = true;
508              } else {
509                raw = false;
510              }
511            }
512          }
513          if (map.get(tTable) == null) {
514            map.put(tTable, new HashMap<>());
515            tblMap = map.get(tTable);
516          }
517          if (tblMap.get(region) == null) {
518            tblMap.put(region, new HashMap<>());
519          }
520          Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
521          if (famMap.get(fam) == null) {
522            famMap.put(fam, new ArrayList<>());
523          }
524          famMap.get(fam).add(new Pair<>(path, raw));
525          LOG.debug("found orig " + path + " for " + fam + " of table " + region);
526        }
527      }
528    }
529    return new Pair<>(map, rows);
530  }
531
532  /*
533   * @param sTableList List of tables
534   * @param maps array of Map of family to List of Paths
535   * @param backupId the backup Id
536   */
537  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
538    String backupId) throws IOException {
539    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
540      long ts = EnvironmentEdgeManager.currentTime();
541      int cnt = 0;
542      List<Put> puts = new ArrayList<>();
543      for (int idx = 0; idx < maps.length; idx++) {
544        Map<byte[], List<Path>> map = maps[idx];
545        TableName tn = sTableList.get(idx);
546
547        if (map == null) {
548          continue;
549        }
550
551        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
552          byte[] fam = entry.getKey();
553          List<Path> paths = entry.getValue();
554          for (Path p : paths) {
555            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId,
556              ts, cnt++);
557            puts.add(put);
558          }
559        }
560      }
561      if (!puts.isEmpty()) {
562        bufferedMutator.mutate(puts);
563      }
564    }
565  }
566
567  /**
568   * Reads backup status object (instance of backup info) from backup system table table
569   * @param backupId backup id
570   * @return Current status of backup session or null
571   */
572  public BackupInfo readBackupInfo(String backupId) throws IOException {
573    if (LOG.isTraceEnabled()) {
574      LOG.trace("read backup status from backup system table for: " + backupId);
575    }
576
577    try (Table table = connection.getTable(tableName)) {
578      Get get = createGetForBackupInfo(backupId);
579      Result res = table.get(get);
580      if (res.isEmpty()) {
581        return null;
582      }
583      return resultToBackupInfo(res);
584    }
585  }
586
587  /**
588   * Read the last backup start code (timestamp) of last successful backup. Will return null if
589   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
590   * there is no successful backup completed so far.
591   * @param backupRoot directory path to backup destination
592   * @return the timestamp of last successful backup
593   * @throws IOException exception
594   */
595  public String readBackupStartCode(String backupRoot) throws IOException {
596    LOG.trace("read backup start code from backup system table");
597
598    try (Table table = connection.getTable(tableName)) {
599      Get get = createGetForStartCode(backupRoot);
600      Result res = table.get(get);
601      if (res.isEmpty()) {
602        return null;
603      }
604      Cell cell = res.listCells().get(0);
605      byte[] val = CellUtil.cloneValue(cell);
606      if (val.length == 0) {
607        return null;
608      }
609      return new String(val, StandardCharsets.UTF_8);
610    }
611  }
612
613  /**
614   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
615   * @param startCode  start code
616   * @param backupRoot root directory path to backup
617   * @throws IOException exception
618   */
619  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
620    if (LOG.isTraceEnabled()) {
621      LOG.trace("write backup start code to backup system table " + startCode);
622    }
623    try (Table table = connection.getTable(tableName)) {
624      Put put = createPutForStartCode(startCode.toString(), backupRoot);
625      table.put(put);
626    }
627  }
628
629  /**
630   * Exclusive operations are: create, delete, merge
631   * @throws IOException if a table operation fails or an active backup exclusive operation is
632   *                     already underway
633   */
634  public void startBackupExclusiveOperation() throws IOException {
635    LOG.debug("Start new backup exclusive operation");
636
637    try (Table table = connection.getTable(tableName)) {
638      Put put = createPutForStartBackupSession();
639      // First try to put if row does not exist
640      if (
641        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
642          .ifNotExists().thenPut(put)
643      ) {
644        // Row exists, try to put if value == ACTIVE_SESSION_NO
645        if (
646          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
647            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
648        ) {
649          throw new ExclusiveOperationException();
650        }
651      }
652    }
653  }
654
655  private Put createPutForStartBackupSession() {
656    Put put = new Put(ACTIVE_SESSION_ROW);
657    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
658    return put;
659  }
660
661  public void finishBackupExclusiveOperation() throws IOException {
662    LOG.debug("Finish backup exclusive operation");
663
664    try (Table table = connection.getTable(tableName)) {
665      Put put = createPutForStopBackupSession();
666      if (
667        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
668          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
669      ) {
670        throw new IOException("There is no active backup exclusive operation");
671      }
672    }
673  }
674
675  private Put createPutForStopBackupSession() {
676    Put put = new Put(ACTIVE_SESSION_ROW);
677    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
678    return put;
679  }
680
681  /**
682   * Get the Region Servers log information after the last log roll from backup system table.
683   * @param backupRoot root directory path to backup
684   * @return RS log info
685   * @throws IOException exception
686   */
687  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
688    throws IOException {
689    LOG.trace("read region server last roll log result to backup system table");
690
691    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
692
693    try (Table table = connection.getTable(tableName);
694      ResultScanner scanner = table.getScanner(scan)) {
695      Result res;
696      HashMap<String, Long> rsTimestampMap = new HashMap<>();
697      while ((res = scanner.next()) != null) {
698        res.advance();
699        Cell cell = res.current();
700        byte[] row = CellUtil.cloneRow(cell);
701        String server = getServerNameForReadRegionServerLastLogRollResult(row);
702        byte[] data = CellUtil.cloneValue(cell);
703        rsTimestampMap.put(server, Bytes.toLong(data));
704      }
705      return rsTimestampMap;
706    }
707  }
708
709  /**
710   * Writes Region Server last roll log result (timestamp) to backup system table table
711   * @param server     Region Server name
712   * @param ts         last log timestamp
713   * @param backupRoot root directory path to backup
714   * @throws IOException exception
715   */
716  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
717    throws IOException {
718    LOG.trace("write region server last roll log result to backup system table");
719
720    try (Table table = connection.getTable(tableName)) {
721      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
722      table.put(put);
723    }
724  }
725
726  /**
727   * Get all completed backup information (in desc order by time)
728   * @param onlyCompleted true, if only successfully completed sessions
729   * @return history info of BackupCompleteData
730   * @throws IOException exception
731   */
732  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
733    LOG.trace("get backup history from backup system table");
734
735    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
736    ArrayList<BackupInfo> list = getBackupInfos(state);
737    return BackupUtils.sortHistoryListDesc(list);
738  }
739
740  /**
741   * Get all backups history
742   * @return list of backup info
743   * @throws IOException if getting the backup history fails
744   */
745  public List<BackupInfo> getBackupHistory() throws IOException {
746    return getBackupHistory(false);
747  }
748
749  /**
750   * Get first n backup history records
751   * @param n number of records, if n== -1 - max number is ignored
752   * @return list of records
753   * @throws IOException if getting the backup history fails
754   */
755  public List<BackupInfo> getHistory(int n) throws IOException {
756    List<BackupInfo> history = getBackupHistory();
757    if (n == -1 || history.size() <= n) {
758      return history;
759    }
760    return Collections.unmodifiableList(history.subList(0, n));
761  }
762
763  /**
764   * Get backup history records filtered by list of filters.
765   * @param n       max number of records, if n == -1 , then max number is ignored
766   * @param filters list of filters
767   * @return backup records
768   * @throws IOException if getting the backup history fails
769   */
770  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
771    if (filters.length == 0) {
772      return getHistory(n);
773    }
774
775    List<BackupInfo> history = getBackupHistory();
776    List<BackupInfo> result = new ArrayList<>();
777    for (BackupInfo bi : history) {
778      if (n >= 0 && result.size() == n) {
779        break;
780      }
781
782      boolean passed = true;
783      for (int i = 0; i < filters.length; i++) {
784        if (!filters[i].apply(bi)) {
785          passed = false;
786          break;
787        }
788      }
789      if (passed) {
790        result.add(bi);
791      }
792    }
793    return result;
794  }
795
796  /*
797   * Retrieve TableName's for completed backup of given type
798   * @param type backup type
799   * @return List of table names
800   */
801  public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
802    Set<TableName> names = new HashSet<>();
803    List<BackupInfo> infos = getBackupHistory(true);
804    for (BackupInfo info : infos) {
805      if (info.getType() == type) {
806        names.addAll(info.getTableNames());
807      }
808    }
809    return new ArrayList<>(names);
810  }
811
812  /**
813   * Get history for backup destination
814   * @param backupRoot backup destination path
815   * @return List of backup info
816   * @throws IOException if getting the backup history fails
817   */
818  public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
819    ArrayList<BackupInfo> history = getBackupHistory(false);
820    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
821      BackupInfo info = iterator.next();
822      if (!backupRoot.equals(info.getBackupRootDir())) {
823        iterator.remove();
824      }
825    }
826    return history;
827  }
828
829  /**
830   * Get history for a table
831   * @param name table name
832   * @return history for a table
833   * @throws IOException if getting the backup history fails
834   */
835  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
836    List<BackupInfo> history = getBackupHistory();
837    List<BackupInfo> tableHistory = new ArrayList<>();
838    for (BackupInfo info : history) {
839      List<TableName> tables = info.getTableNames();
840      if (tables.contains(name)) {
841        tableHistory.add(info);
842      }
843    }
844    return tableHistory;
845  }
846
847  /**
848   * Goes through all backup history corresponding to the provided root folder, and collects all
849   * backup info mentioning each of the provided tables.
850   * @param set        the tables for which to collect the {@code BackupInfo}
851   * @param backupRoot backup destination path to retrieve backup history for
852   * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at
853   *         least one {@code BackupInfo}
854   * @throws IOException if getting the backup history fails
855   */
856  public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
857    String backupRoot) throws IOException {
858    List<BackupInfo> history = getBackupHistory(backupRoot);
859    Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>();
860    for (BackupInfo info : history) {
861      List<TableName> tables = info.getTableNames();
862      for (TableName tableName : tables) {
863        if (set.contains(tableName)) {
864          List<BackupInfo> list =
865            tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>());
866          list.add(info);
867        }
868      }
869    }
870    return tableHistoryMap;
871  }
872
873  /**
874   * Get all backup sessions with a given state (in descending order by time)
875   * @param state backup session state
876   * @return history info of backup info objects
877   * @throws IOException exception
878   */
879  public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
880    LOG.trace("get backup infos from backup system table");
881
882    Scan scan = createScanForBackupHistory();
883    ArrayList<BackupInfo> list = new ArrayList<>();
884
885    try (Table table = connection.getTable(tableName);
886      ResultScanner scanner = table.getScanner(scan)) {
887      Result res;
888      while ((res = scanner.next()) != null) {
889        res.advance();
890        BackupInfo context = cellToBackupInfo(res.current());
891        if (state != BackupState.ANY && context.getState() != state) {
892          continue;
893        }
894        list.add(context);
895      }
896      return list;
897    }
898  }
899
900  /**
901   * Write the current timestamps for each regionserver to backup system table after a successful
902   * full or incremental backup. The saved timestamp is of the last log file that was backed up
903   * already.
904   * @param tables        tables
905   * @param newTimestamps timestamps
906   * @param backupRoot    root directory path to backup
907   * @throws IOException exception
908   */
909  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
910    String backupRoot) throws IOException {
911    if (LOG.isTraceEnabled()) {
912      LOG.trace("write RS log time stamps to backup system table for tables ["
913        + StringUtils.join(tables, ",") + "]");
914    }
915    List<Put> puts = new ArrayList<>();
916    for (TableName table : tables) {
917      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
918      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
919      puts.add(put);
920    }
921    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) {
922      bufferedMutator.mutate(puts);
923    }
924  }
925
926  /**
927   * Read the timestamp for each region server log after the last successful backup. Each table has
928   * its own set of the timestamps. The info is stored for each table as a concatenated string of
929   * rs->timestapmp
930   * @param backupRoot root directory path to backup
931   * @return the timestamp for each region server. key: tableName value:
932   *         RegionServer,PreviousTimeStamp
933   * @throws IOException exception
934   */
935  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
936    throws IOException {
937    if (LOG.isTraceEnabled()) {
938      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
939    }
940
941    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
942
943    Scan scan = createScanForReadLogTimestampMap(backupRoot);
944    try (Table table = connection.getTable(tableName);
945      ResultScanner scanner = table.getScanner(scan)) {
946      Result res;
947      while ((res = scanner.next()) != null) {
948        res.advance();
949        Cell cell = res.current();
950        byte[] row = CellUtil.cloneRow(cell);
951        String tabName = getTableNameForReadLogTimestampMap(row);
952        TableName tn = TableName.valueOf(tabName);
953        byte[] data = CellUtil.cloneValue(cell);
954        if (data == null) {
955          throw new IOException("Data of last backup data from backup system table "
956            + "is empty. Create a backup first.");
957        }
958        if (data != null && data.length > 0) {
959          HashMap<String, Long> lastBackup =
960            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
961          tableTimestampMap.put(tn, lastBackup);
962        }
963      }
964      return tableTimestampMap;
965    }
966  }
967
968  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
969    Map<String, Long> map) {
970    BackupProtos.TableServerTimestamp.Builder tstBuilder =
971      BackupProtos.TableServerTimestamp.newBuilder();
972    tstBuilder
973      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
974
975    for (Entry<String, Long> entry : map.entrySet()) {
976      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
977      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
978      ServerName sn = ServerName.parseServerName(entry.getKey());
979      snBuilder.setHostName(sn.getHostname());
980      snBuilder.setPort(sn.getPort());
981      builder.setServerName(snBuilder.build());
982      builder.setTimestamp(entry.getValue());
983      tstBuilder.addServerTimestamp(builder.build());
984    }
985
986    return tstBuilder.build();
987  }
988
989  private HashMap<String, Long>
990    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
991
992    HashMap<String, Long> map = new HashMap<>();
993    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
994    for (BackupProtos.ServerTimestamp st : list) {
995      ServerName sn =
996        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
997      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
998    }
999    return map;
1000  }
1001
1002  /**
1003   * Return the current tables covered by incremental backup.
1004   * @param backupRoot root directory path to backup
1005   * @return set of tableNames
1006   * @throws IOException exception
1007   */
1008  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
1009    LOG.trace("get incremental backup table set from backup system table");
1010
1011    TreeSet<TableName> set = new TreeSet<>();
1012
1013    try (Table table = connection.getTable(tableName)) {
1014      Get get = createGetForIncrBackupTableSet(backupRoot);
1015      Result res = table.get(get);
1016      if (res.isEmpty()) {
1017        return set;
1018      }
1019      List<Cell> cells = res.listCells();
1020      for (Cell cell : cells) {
1021        // qualifier = table name - we use table names as qualifiers
1022        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
1023      }
1024      return set;
1025    }
1026  }
1027
1028  /**
1029   * Add tables to global incremental backup set
1030   * @param tables     set of tables
1031   * @param backupRoot root directory path to backup
1032   * @throws IOException exception
1033   */
1034  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
1035    throws IOException {
1036    if (LOG.isTraceEnabled()) {
1037      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
1038        + " tables [" + StringUtils.join(tables, " ") + "]");
1039    }
1040    if (LOG.isDebugEnabled()) {
1041      tables.forEach(table -> LOG.debug(Objects.toString(table)));
1042    }
1043    try (Table table = connection.getTable(tableName)) {
1044      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
1045      table.put(put);
1046    }
1047  }
1048
1049  /**
1050   * Deletes incremental backup set for a backup destination
1051   * @param backupRoot backup root
1052   */
1053  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
1054    if (LOG.isTraceEnabled()) {
1055      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
1056    }
1057    try (Table table = connection.getTable(tableName)) {
1058      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
1059      table.delete(delete);
1060    }
1061  }
1062
1063  /**
1064   * Checks if we have at least one backup session in backup system table This API is used by
1065   * BackupLogCleaner
1066   * @return true, if - at least one session exists in backup system table table
1067   * @throws IOException exception
1068   */
1069  public boolean hasBackupSessions() throws IOException {
1070    LOG.trace("Has backup sessions from backup system table");
1071
1072    boolean result = false;
1073    Scan scan = createScanForBackupHistory();
1074    scan.setCaching(1);
1075    try (Table table = connection.getTable(tableName);
1076      ResultScanner scanner = table.getScanner(scan)) {
1077      if (scanner.next() != null) {
1078        result = true;
1079      }
1080      return result;
1081    }
1082  }
1083
1084  /**
1085   * BACKUP SETS
1086   */
1087
1088  /**
1089   * Get backup set list
1090   * @return backup set list
1091   * @throws IOException if a table or scanner operation fails
1092   */
1093  public List<String> listBackupSets() throws IOException {
1094    LOG.trace("Backup set list");
1095
1096    List<String> list = new ArrayList<>();
1097    try (Table table = connection.getTable(tableName)) {
1098      Scan scan = createScanForBackupSetList();
1099      scan.readVersions(1);
1100      try (ResultScanner scanner = table.getScanner(scan)) {
1101        Result res;
1102        while ((res = scanner.next()) != null) {
1103          res.advance();
1104          list.add(cellKeyToBackupSetName(res.current()));
1105        }
1106        return list;
1107      }
1108    }
1109  }
1110
1111  /**
1112   * Get backup set description (list of tables)
1113   * @param name set's name
1114   * @return list of tables in a backup set
1115   * @throws IOException if a table operation fails
1116   */
1117  public List<TableName> describeBackupSet(String name) throws IOException {
1118    if (LOG.isTraceEnabled()) {
1119      LOG.trace(" Backup set describe: " + name);
1120    }
1121    try (Table table = connection.getTable(tableName)) {
1122      Get get = createGetForBackupSet(name);
1123      Result res = table.get(get);
1124      if (res.isEmpty()) {
1125        return null;
1126      }
1127      res.advance();
1128      String[] tables = cellValueToBackupSet(res.current());
1129      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
1130        .collect(Collectors.toList());
1131    }
1132  }
1133
1134  /**
1135   * Add backup set (list of tables)
1136   * @param name      set name
1137   * @param newTables list of tables, comma-separated
1138   * @throws IOException if a table operation fails
1139   */
1140  public void addToBackupSet(String name, String[] newTables) throws IOException {
1141    if (LOG.isTraceEnabled()) {
1142      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
1143    }
1144    String[] union = null;
1145    try (Table table = connection.getTable(tableName)) {
1146      Get get = createGetForBackupSet(name);
1147      Result res = table.get(get);
1148      if (res.isEmpty()) {
1149        union = newTables;
1150      } else {
1151        res.advance();
1152        String[] tables = cellValueToBackupSet(res.current());
1153        union = merge(tables, newTables);
1154      }
1155      Put put = createPutForBackupSet(name, union);
1156      table.put(put);
1157    }
1158  }
1159
1160  /**
1161   * Remove tables from backup set (list of tables)
1162   * @param name     set name
1163   * @param toRemove list of tables
1164   * @throws IOException if a table operation or deleting the backup set fails
1165   */
1166  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
1167    if (LOG.isTraceEnabled()) {
1168      LOG.trace(
1169        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
1170    }
1171    String[] disjoint;
1172    String[] tables;
1173    try (Table table = connection.getTable(tableName)) {
1174      Get get = createGetForBackupSet(name);
1175      Result res = table.get(get);
1176      if (res.isEmpty()) {
1177        LOG.warn("Backup set '" + name + "' not found.");
1178        return;
1179      } else {
1180        res.advance();
1181        tables = cellValueToBackupSet(res.current());
1182        disjoint = disjoin(tables, toRemove);
1183      }
1184      if (disjoint.length > 0 && disjoint.length != tables.length) {
1185        Put put = createPutForBackupSet(name, disjoint);
1186        table.put(put);
1187      } else if (disjoint.length == tables.length) {
1188        LOG.warn("Backup set '" + name + "' does not contain tables ["
1189          + StringUtils.join(toRemove, " ") + "]");
1190      } else { // disjoint.length == 0 and tables.length >0
1191        // Delete backup set
1192        LOG.info("Backup set '" + name + "' is empty. Deleting.");
1193        deleteBackupSet(name);
1194      }
1195    }
1196  }
1197
1198  private String[] merge(String[] existingTables, String[] newTables) {
1199    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1200    tables.addAll(Arrays.asList(newTables));
1201    return tables.toArray(new String[0]);
1202  }
1203
1204  private String[] disjoin(String[] existingTables, String[] toRemove) {
1205    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1206    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
1207    return tables.toArray(new String[0]);
1208  }
1209
1210  /**
1211   * Delete backup set
1212   * @param name set's name
1213   * @throws IOException if getting or deleting the table fails
1214   */
1215  public void deleteBackupSet(String name) throws IOException {
1216    if (LOG.isTraceEnabled()) {
1217      LOG.trace(" Backup set delete: " + name);
1218    }
1219    try (Table table = connection.getTable(tableName)) {
1220      Delete del = createDeleteForBackupSet(name);
1221      table.delete(del);
1222    }
1223  }
1224
1225  /**
1226   * Get backup system table descriptor
1227   * @return table's descriptor
1228   */
1229  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1230    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1231
1232    ColumnFamilyDescriptorBuilder colBuilder =
1233      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1234
1235    colBuilder.setMaxVersions(1);
1236    Configuration config = HBaseConfiguration.create();
1237    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1238      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1239    colBuilder.setTimeToLive(ttl);
1240
1241    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1242    builder.setColumnFamily(colSessionsDesc);
1243
1244    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1245    colBuilder.setTimeToLive(ttl);
1246    builder.setColumnFamily(colBuilder.build());
1247    return builder.build();
1248  }
1249
1250  public static TableName getTableName(Configuration conf) {
1251    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1252      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1253    return TableName.valueOf(name);
1254  }
1255
1256  public static String getTableNameAsString(Configuration conf) {
1257    return getTableName(conf).getNameAsString();
1258  }
1259
1260  public static String getSnapshotName(Configuration conf) {
1261    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1262  }
1263
1264  /**
1265   * Get backup system table descriptor
1266   * @return table's descriptor
1267   */
1268  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1269    TableDescriptorBuilder builder =
1270      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1271
1272    ColumnFamilyDescriptorBuilder colBuilder =
1273      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1274    colBuilder.setMaxVersions(1);
1275    Configuration config = HBaseConfiguration.create();
1276    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1277      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1278    colBuilder.setTimeToLive(ttl);
1279    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1280    builder.setColumnFamily(colSessionsDesc);
1281    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1282    colBuilder.setTimeToLive(ttl);
1283    builder.setColumnFamily(colBuilder.build());
1284    return builder.build();
1285  }
1286
1287  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1288    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1289      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1290    return TableName.valueOf(name);
1291  }
1292
1293  /**
1294   * Creates Put operation for a given backup info object
1295   * @param context backup info
1296   * @return put operation
1297   * @throws IOException exception
1298   */
1299  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1300    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1301    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1302      context.toByteArray());
1303    return put;
1304  }
1305
1306  /**
1307   * Creates Get operation for a given backup id
1308   * @param backupId backup's ID
1309   * @return get operation
1310   * @throws IOException exception
1311   */
1312  private Get createGetForBackupInfo(String backupId) throws IOException {
1313    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1314    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1315    get.readVersions(1);
1316    return get;
1317  }
1318
1319  /**
1320   * Creates Delete operation for a given backup id
1321   * @param backupId backup's ID
1322   * @return delete operation
1323   */
1324  private Delete createDeleteForBackupInfo(String backupId) {
1325    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1326    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1327    return del;
1328  }
1329
1330  /**
1331   * Converts Result to BackupInfo
1332   * @param res HBase result
1333   * @return backup info instance
1334   * @throws IOException exception
1335   */
1336  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1337    res.advance();
1338    Cell cell = res.current();
1339    return cellToBackupInfo(cell);
1340  }
1341
1342  /**
1343   * Creates Get operation to retrieve start code from backup system table
1344   * @return get operation
1345   * @throws IOException exception
1346   */
1347  private Get createGetForStartCode(String rootPath) throws IOException {
1348    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1349    get.addFamily(BackupSystemTable.META_FAMILY);
1350    get.readVersions(1);
1351    return get;
1352  }
1353
1354  /**
1355   * Creates Put operation to store start code to backup system table
1356   * @return put operation
1357   */
1358  private Put createPutForStartCode(String startCode, String rootPath) {
1359    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1360    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1361      Bytes.toBytes(startCode));
1362    return put;
1363  }
1364
1365  /**
1366   * Creates Get to retrieve incremental backup table set from backup system table
1367   * @return get operation
1368   * @throws IOException exception
1369   */
1370  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1371    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1372    get.addFamily(BackupSystemTable.META_FAMILY);
1373    get.readVersions(1);
1374    return get;
1375  }
1376
1377  /**
1378   * Creates Put to store incremental backup table set
1379   * @param tables tables
1380   * @return put operation
1381   */
1382  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1383    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1384    for (TableName table : tables) {
1385      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1386        EMPTY_VALUE);
1387    }
1388    return put;
1389  }
1390
1391  /**
1392   * Creates Delete for incremental backup table set
1393   * @param backupRoot backup root
1394   * @return delete operation
1395   */
1396  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1397    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1398    delete.addFamily(BackupSystemTable.META_FAMILY);
1399    return delete;
1400  }
1401
1402  /**
1403   * Creates Scan operation to load backup history
1404   * @return scan operation
1405   */
1406  private Scan createScanForBackupHistory() {
1407    Scan scan = new Scan();
1408    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1409    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1410    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1411    scan.withStartRow(startRow);
1412    scan.withStopRow(stopRow);
1413    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1414    scan.readVersions(1);
1415    return scan;
1416  }
1417
1418  /**
1419   * Converts cell to backup info instance.
1420   * @param current current cell
1421   * @return backup backup info instance
1422   * @throws IOException exception
1423   */
1424  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1425    byte[] data = CellUtil.cloneValue(current);
1426    return BackupInfo.fromByteArray(data);
1427  }
1428
1429  /**
1430   * Creates Put to write RS last roll log timestamp map
1431   * @param table table
1432   * @param smap  map, containing RS:ts
1433   * @return put operation
1434   */
1435  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1436    String backupRoot) {
1437    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1438    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1439    return put;
1440  }
1441
1442  /**
1443   * Creates Scan to load table-> { RS -> ts} map of maps
1444   * @return scan operation
1445   */
1446  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1447    Scan scan = new Scan();
1448    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1449    scan.addFamily(BackupSystemTable.META_FAMILY);
1450
1451    return scan;
1452  }
1453
1454  /**
1455   * Get table name from rowkey
1456   * @param cloneRow rowkey
1457   * @return table name
1458   */
1459  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1460    String s = Bytes.toString(cloneRow);
1461    int index = s.lastIndexOf(NULL);
1462    return s.substring(index + 1);
1463  }
1464
1465  /**
1466   * Creates Put to store RS last log result
1467   * @param server    server name
1468   * @param timestamp log roll result (timestamp)
1469   * @return put operation
1470   */
1471  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1472    String backupRoot) {
1473    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1474    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1475      Bytes.toBytes(timestamp));
1476    return put;
1477  }
1478
1479  /**
1480   * Creates Scan operation to load last RS log roll results
1481   * @return scan operation
1482   */
1483  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1484    Scan scan = new Scan();
1485    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1486    scan.addFamily(BackupSystemTable.META_FAMILY);
1487    scan.readVersions(1);
1488
1489    return scan;
1490  }
1491
1492  /**
1493   * Get server's name from rowkey
1494   * @param row rowkey
1495   * @return server's name
1496   */
1497  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1498    String s = Bytes.toString(row);
1499    int index = s.lastIndexOf(NULL);
1500    return s.substring(index + 1);
1501  }
1502
1503  /*
1504   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1505   */
1506  static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
1507    Map<byte[], List<Path>> finalPaths) {
1508    List<Put> puts = new ArrayList<>();
1509    for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
1510      for (Path path : entry.getValue()) {
1511        String file = path.toString();
1512        int lastSlash = file.lastIndexOf("/");
1513        String filename = file.substring(lastSlash + 1);
1514        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1515          Bytes.toString(region), BLK_LD_DELIM, filename));
1516        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1517        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1518        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1519        put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
1520        puts.add(put);
1521        LOG
1522          .debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
1523      }
1524    }
1525    return puts;
1526  }
1527
1528  public static void snapshot(Connection conn) throws IOException {
1529    try (Admin admin = conn.getAdmin()) {
1530      Configuration conf = conn.getConfiguration();
1531      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1532    }
1533  }
1534
1535  public static void restoreFromSnapshot(Connection conn) throws IOException {
1536    Configuration conf = conn.getConfiguration();
1537    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1538    try (Admin admin = conn.getAdmin()) {
1539      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1540      if (snapshotExists(admin, snapshotName)) {
1541        admin.disableTable(BackupSystemTable.getTableName(conf));
1542        admin.restoreSnapshot(snapshotName);
1543        admin.enableTable(BackupSystemTable.getTableName(conf));
1544        LOG.debug("Done restoring backup system table");
1545      } else {
1546        // Snapshot does not exists, i.e completeBackup failed after
1547        // deleting backup system table snapshot
1548        // In this case we log WARN and proceed
1549        LOG.warn(
1550          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1551      }
1552    }
1553  }
1554
1555  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1556    List<SnapshotDescription> list = admin.listSnapshots();
1557    for (SnapshotDescription desc : list) {
1558      if (desc.getName().equals(snapshotName)) {
1559        return true;
1560      }
1561    }
1562    return false;
1563  }
1564
1565  public static boolean snapshotExists(Connection conn) throws IOException {
1566    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1567  }
1568
1569  public static void deleteSnapshot(Connection conn) throws IOException {
1570    Configuration conf = conn.getConfiguration();
1571    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1572    try (Admin admin = conn.getAdmin()) {
1573      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1574      if (snapshotExists(admin, snapshotName)) {
1575        admin.deleteSnapshot(snapshotName);
1576        LOG.debug("Done deleting backup system table snapshot");
1577      } else {
1578        LOG.error("Snapshot " + snapshotName + " does not exists");
1579      }
1580    }
1581  }
1582
1583  /*
1584   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1585   */
1586  static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family,
1587    final List<Pair<Path, Path>> pairs) {
1588    List<Put> puts = new ArrayList<>(pairs.size());
1589    for (Pair<Path, Path> pair : pairs) {
1590      Path path = pair.getSecond();
1591      String file = path.toString();
1592      int lastSlash = file.lastIndexOf("/");
1593      String filename = file.substring(lastSlash + 1);
1594      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1595        Bytes.toString(region), BLK_LD_DELIM, filename));
1596      put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1597      put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
1598      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1599      put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
1600      puts.add(put);
1601      LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
1602    }
1603    return puts;
1604  }
1605
1606  public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
1607    List<Delete> lstDels = new ArrayList<>(lst.size());
1608    for (TableName table : lst) {
1609      Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
1610      del.addFamily(BackupSystemTable.META_FAMILY);
1611      lstDels.add(del);
1612    }
1613    return lstDels;
1614  }
1615
1616  private Put createPutForDeleteOperation(String[] backupIdList) {
1617    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1618    Put put = new Put(DELETE_OP_ROW);
1619    put.addColumn(META_FAMILY, FAM_COL, value);
1620    return put;
1621  }
1622
1623  private Delete createDeleteForBackupDeleteOperation() {
1624    Delete delete = new Delete(DELETE_OP_ROW);
1625    delete.addFamily(META_FAMILY);
1626    return delete;
1627  }
1628
1629  private Get createGetForDeleteOperation() {
1630    Get get = new Get(DELETE_OP_ROW);
1631    get.addFamily(META_FAMILY);
1632    return get;
1633  }
1634
1635  public void startDeleteOperation(String[] backupIdList) throws IOException {
1636    if (LOG.isTraceEnabled()) {
1637      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1638    }
1639    Put put = createPutForDeleteOperation(backupIdList);
1640    try (Table table = connection.getTable(tableName)) {
1641      table.put(put);
1642    }
1643  }
1644
1645  public void finishDeleteOperation() throws IOException {
1646    LOG.trace("Finsih delete operation for backup ids");
1647
1648    Delete delete = createDeleteForBackupDeleteOperation();
1649    try (Table table = connection.getTable(tableName)) {
1650      table.delete(delete);
1651    }
1652  }
1653
1654  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1655    LOG.trace("Get delete operation for backup ids");
1656
1657    Get get = createGetForDeleteOperation();
1658    try (Table table = connection.getTable(tableName)) {
1659      Result res = table.get(get);
1660      if (res.isEmpty()) {
1661        return null;
1662      }
1663      Cell cell = res.listCells().get(0);
1664      byte[] val = CellUtil.cloneValue(cell);
1665      if (val.length == 0) {
1666        return null;
1667      }
1668      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1669        .toArray(String[]::new);
1670    }
1671  }
1672
1673  private Put createPutForMergeOperation(String[] backupIdList) {
1674    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1675    Put put = new Put(MERGE_OP_ROW);
1676    put.addColumn(META_FAMILY, FAM_COL, value);
1677    return put;
1678  }
1679
1680  public boolean isMergeInProgress() throws IOException {
1681    Get get = new Get(MERGE_OP_ROW);
1682    try (Table table = connection.getTable(tableName)) {
1683      Result res = table.get(get);
1684      return !res.isEmpty();
1685    }
1686  }
1687
1688  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1689    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1690    Put put = new Put(MERGE_OP_ROW);
1691    put.addColumn(META_FAMILY, PATH_COL, value);
1692    return put;
1693  }
1694
1695  private Delete createDeleteForBackupMergeOperation() {
1696    Delete delete = new Delete(MERGE_OP_ROW);
1697    delete.addFamily(META_FAMILY);
1698    return delete;
1699  }
1700
1701  private Get createGetForMergeOperation() {
1702    Get get = new Get(MERGE_OP_ROW);
1703    get.addFamily(META_FAMILY);
1704    return get;
1705  }
1706
1707  public void startMergeOperation(String[] backupIdList) throws IOException {
1708    if (LOG.isTraceEnabled()) {
1709      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1710    }
1711    Put put = createPutForMergeOperation(backupIdList);
1712    try (Table table = connection.getTable(tableName)) {
1713      table.put(put);
1714    }
1715  }
1716
1717  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1718    if (LOG.isTraceEnabled()) {
1719      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1720    }
1721    Put put = createPutForUpdateTablesForMerge(tables);
1722    try (Table table = connection.getTable(tableName)) {
1723      table.put(put);
1724    }
1725  }
1726
1727  public void finishMergeOperation() throws IOException {
1728    LOG.trace("Finish merge operation for backup ids");
1729
1730    Delete delete = createDeleteForBackupMergeOperation();
1731    try (Table table = connection.getTable(tableName)) {
1732      table.delete(delete);
1733    }
1734  }
1735
1736  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1737    LOG.trace("Get backup ids for merge operation");
1738
1739    Get get = createGetForMergeOperation();
1740    try (Table table = connection.getTable(tableName)) {
1741      Result res = table.get(get);
1742      if (res.isEmpty()) {
1743        return null;
1744      }
1745      Cell cell = res.listCells().get(0);
1746      byte[] val = CellUtil.cloneValue(cell);
1747      if (val.length == 0) {
1748        return null;
1749      }
1750      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1751        .toArray(String[]::new);
1752    }
1753  }
1754
1755  static Scan createScanForOrigBulkLoadedFiles(TableName table) {
1756    Scan scan = new Scan();
1757    byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1758    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1759    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1760    scan.withStartRow(startRow);
1761    scan.withStopRow(stopRow);
1762    scan.addFamily(BackupSystemTable.META_FAMILY);
1763    scan.readVersions(1);
1764    return scan;
1765  }
1766
1767  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1768    // format is bulk : namespace : table : region : file
1769    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1770  }
1771
1772  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1773    // format is bulk : namespace : table : region : file
1774    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1775    Iterator<String> i = parts.iterator();
1776    int idx = 3;
1777    if (parts.size() == 4) {
1778      // the table is in default namespace
1779      idx = 2;
1780    }
1781    String region = Iterators.get(i, idx);
1782    LOG.debug("bulk row string " + rowStr + " region " + region);
1783    return region;
1784  }
1785
1786  /*
1787   * Used to query bulk loaded hfiles which have been copied by incremental backup
1788   * @param backupId the backup Id. It can be null when querying for all tables
1789   * @return the Scan object
1790   */
1791  static Scan createScanForBulkLoadedFiles(String backupId) {
1792    Scan scan = new Scan();
1793    byte[] startRow =
1794      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1795    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1796    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1797    scan.withStartRow(startRow);
1798    scan.withStopRow(stopRow);
1799    scan.addFamily(BackupSystemTable.META_FAMILY);
1800    scan.readVersions(1);
1801    return scan;
1802  }
1803
1804  static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
1805    long ts, int idx) {
1806    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
1807    put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
1808    put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
1809    put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p));
1810    return put;
1811  }
1812
1813  /**
1814   * Creates Scan operation to load backup set list
1815   * @return scan operation
1816   */
1817  private Scan createScanForBackupSetList() {
1818    Scan scan = new Scan();
1819    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1820    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1821    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1822    scan.withStartRow(startRow);
1823    scan.withStopRow(stopRow);
1824    scan.addFamily(BackupSystemTable.META_FAMILY);
1825    return scan;
1826  }
1827
1828  /**
1829   * Creates Get operation to load backup set content
1830   * @return get operation
1831   */
1832  private Get createGetForBackupSet(String name) {
1833    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1834    get.addFamily(BackupSystemTable.META_FAMILY);
1835    return get;
1836  }
1837
1838  /**
1839   * Creates Delete operation to delete backup set content
1840   * @param name backup set's name
1841   * @return delete operation
1842   */
1843  private Delete createDeleteForBackupSet(String name) {
1844    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1845    del.addFamily(BackupSystemTable.META_FAMILY);
1846    return del;
1847  }
1848
1849  /**
1850   * Creates Put operation to update backup set content
1851   * @param name   backup set's name
1852   * @param tables list of tables
1853   * @return put operation
1854   */
1855  private Put createPutForBackupSet(String name, String[] tables) {
1856    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1857    byte[] value = convertToByteArray(tables);
1858    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1859    return put;
1860  }
1861
1862  private byte[] convertToByteArray(String[] tables) {
1863    return Bytes.toBytes(StringUtils.join(tables, ","));
1864  }
1865
1866  /**
1867   * Converts cell to backup set list.
1868   * @param current current cell
1869   * @return backup set as array of table names
1870   */
1871  private String[] cellValueToBackupSet(Cell current) {
1872    byte[] data = CellUtil.cloneValue(current);
1873    if (!ArrayUtils.isEmpty(data)) {
1874      return Bytes.toString(data).split(",");
1875    }
1876    return new String[0];
1877  }
1878
1879  /**
1880   * Converts cell key to backup set name.
1881   * @param current current cell
1882   * @return backup set name
1883   */
1884  private String cellKeyToBackupSetName(Cell current) {
1885    byte[] data = CellUtil.cloneRow(current);
1886    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1887  }
1888
1889  private static byte[] rowkey(String s, String... other) {
1890    StringBuilder sb = new StringBuilder(s);
1891    for (String ss : other) {
1892      sb.append(ss);
1893    }
1894    return Bytes.toBytes(sb.toString());
1895  }
1896
1897  private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException {
1898    if (!admin.isTableEnabled(tableName)) {
1899      try {
1900        admin.enableTable(tableName);
1901      } catch (TableNotDisabledException ignored) {
1902        LOG.info("Table {} is not disabled, ignoring enable request", tableName);
1903      }
1904    }
1905  }
1906}