001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.nio.charset.StandardCharsets;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.Objects;
034import java.util.Set;
035import java.util.TreeMap;
036import java.util.TreeSet;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.ArrayUtils;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.NamespaceDescriptor;
046import org.apache.hadoop.hbase.NamespaceExistException;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableExistsException;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.TableNotDisabledException;
051import org.apache.hadoop.hbase.backup.BackupInfo;
052import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
053import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
054import org.apache.hadoop.hbase.backup.BackupType;
055import org.apache.hadoop.hbase.backup.util.BackupUtils;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
059import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.Delete;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.SnapshotDescription;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
078import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
079
080import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
082
083/**
084 * This class provides API to access backup system table<br>
085 * Backup system table schema:<br>
086 * <p>
087 * <ul>
088 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
089 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
090 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; table="meta:"+tablename of
091 * include table; value=empty</li>
092 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
093 * timestamp]</li>
094 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
095 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
096 * name</li>
097 * </ul>
098 * </p>
099 */
100@InterfaceAudience.Private
101public final class BackupSystemTable implements Closeable {
102
103  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
104
105  static class WALItem {
106    String backupId;
107    String walFile;
108    String backupRoot;
109
110    WALItem(String backupId, String walFile, String backupRoot) {
111      this.backupId = backupId;
112      this.walFile = walFile;
113      this.backupRoot = backupRoot;
114    }
115
116    public String getBackupId() {
117      return backupId;
118    }
119
120    public String getWalFile() {
121      return walFile;
122    }
123
124    public String getBackupRoot() {
125      return backupRoot;
126    }
127
128    @Override
129    public String toString() {
130      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
131    }
132  }
133
134  /**
135   * Backup system table (main) name
136   */
137  private TableName tableName;
138
139  /**
140   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
141   * separate table because we have to isolate general backup operations: create, merge etc from
142   * activity of RegionObserver, which controls process of a bulk loading
143   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
144   */
145  private TableName bulkLoadTableName;
146
147  /**
148   * Stores backup sessions (contexts)
149   */
150  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
151  /**
152   * Stores other meta
153   */
154  final static byte[] META_FAMILY = Bytes.toBytes("meta");
155  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
156  /**
157   * Connection to HBase cluster, shared among all instances
158   */
159  private final Connection connection;
160
161  private final static String BACKUP_INFO_PREFIX = "session:";
162  private final static String START_CODE_ROW = "startcode:";
163  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
164  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
165
166  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
167  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
168
169  private final static String INCR_BACKUP_SET = "incrbackupset:";
170  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
171  private final static String RS_LOG_TS_PREFIX = "rslogts:";
172
173  private final static String BULK_LOAD_PREFIX = "bulk:";
174  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
175  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
176  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
177
178  final static byte[] TBL_COL = Bytes.toBytes("tbl");
179  final static byte[] FAM_COL = Bytes.toBytes("fam");
180  final static byte[] PATH_COL = Bytes.toBytes("path");
181
182  private final static String SET_KEY_PREFIX = "backupset:";
183
184  // separator between BULK_LOAD_PREFIX and ordinals
185  private final static String BLK_LD_DELIM = ":";
186  private final static byte[] EMPTY_VALUE = new byte[] {};
187
188  // Safe delimiter in a string
189  private final static String NULL = "\u0000";
190
191  public BackupSystemTable(Connection conn) throws IOException {
192    this.connection = conn;
193    Configuration conf = this.connection.getConfiguration();
194    tableName = BackupSystemTable.getTableName(conf);
195    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
196    checkSystemTable();
197  }
198
199  private void checkSystemTable() throws IOException {
200    try (Admin admin = connection.getAdmin()) {
201      verifyNamespaceExists(admin);
202      Configuration conf = connection.getConfiguration();
203      if (!admin.tableExists(tableName)) {
204        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
205        createSystemTable(admin, backupHTD);
206      }
207      ensureTableEnabled(admin, tableName);
208      if (!admin.tableExists(bulkLoadTableName)) {
209        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
210        createSystemTable(admin, blHTD);
211      }
212      ensureTableEnabled(admin, bulkLoadTableName);
213      waitForSystemTable(admin, tableName);
214      waitForSystemTable(admin, bulkLoadTableName);
215    }
216  }
217
218  private void createSystemTable(Admin admin, TableDescriptor descriptor) throws IOException {
219    try {
220      admin.createTable(descriptor);
221    } catch (TableExistsException e) {
222      // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
223      // so may be subject to race conditions where one caller succeeds in creating the
224      // table and others fail because it now exists
225      LOG.debug("Table {} already exists, ignoring", descriptor.getTableName(), e);
226    }
227  }
228
229  private void verifyNamespaceExists(Admin admin) throws IOException {
230    String namespaceName = tableName.getNamespaceAsString();
231    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
232    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
233    boolean exists = false;
234    for (NamespaceDescriptor nsd : list) {
235      if (nsd.getName().equals(ns.getName())) {
236        exists = true;
237        break;
238      }
239    }
240    if (!exists) {
241      try {
242        admin.createNamespace(ns);
243      } catch (NamespaceExistException e) {
244        // swallow because this class is initialized in concurrent environments (i.e. bulkloads),
245        // so may be subject to race conditions where one caller succeeds in creating the
246        // namespace and others fail because it now exists
247        LOG.debug("Namespace {} already exists, ignoring", ns.getName(), e);
248      }
249    }
250  }
251
252  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
253    // Return fast if the table is available and avoid a log message
254    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
255      return;
256    }
257    long TIMEOUT = 60000;
258    long startTime = EnvironmentEdgeManager.currentTime();
259    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
260      tableName);
261    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
262      try {
263        Thread.sleep(100);
264      } catch (InterruptedException e) {
265        throw (IOException) new InterruptedIOException().initCause(e);
266      }
267      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
268        throw new IOException(
269          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
270      }
271    }
272    LOG.debug("Backup table {} exists and available", tableName);
273  }
274
275  @Override
276  public void close() {
277    // do nothing
278  }
279
280  /**
281   * Updates status (state) of a backup session in backup system table table
282   * @param info backup info
283   * @throws IOException exception
284   */
285  public void updateBackupInfo(BackupInfo info) throws IOException {
286    if (LOG.isTraceEnabled()) {
287      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
288        + " set status=" + info.getState());
289    }
290    try (Table table = connection.getTable(tableName)) {
291      Put put = createPutForBackupInfo(info);
292      table.put(put);
293    }
294  }
295
296  /*
297   * @param backupId the backup Id
298   * @return Map of rows to path of bulk loaded hfile
299   */
300  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
301    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
302    try (Table table = connection.getTable(bulkLoadTableName);
303      ResultScanner scanner = table.getScanner(scan)) {
304      Result res = null;
305      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
306      while ((res = scanner.next()) != null) {
307        res.advance();
308        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
309        for (Cell cell : res.listCells()) {
310          if (
311            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
312              BackupSystemTable.PATH_COL.length) == 0
313          ) {
314            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
315          }
316        }
317      }
318      return map;
319    }
320  }
321
322  /*
323   * Used during restore
324   * @param backupId the backup Id
325   * @param sTableList List of tables
326   * @return array of Map of family to List of Paths
327   */
328  public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
329    throws IOException {
330    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
331    @SuppressWarnings("unchecked")
332    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
333    try (Table table = connection.getTable(bulkLoadTableName);
334      ResultScanner scanner = table.getScanner(scan)) {
335      Result res = null;
336      while ((res = scanner.next()) != null) {
337        res.advance();
338        TableName tbl = null;
339        byte[] fam = null;
340        String path = null;
341        for (Cell cell : res.listCells()) {
342          if (
343            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
344              BackupSystemTable.TBL_COL.length) == 0
345          ) {
346            tbl = TableName.valueOf(CellUtil.cloneValue(cell));
347          } else if (
348            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
349              BackupSystemTable.FAM_COL.length) == 0
350          ) {
351            fam = CellUtil.cloneValue(cell);
352          } else if (
353            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
354              BackupSystemTable.PATH_COL.length) == 0
355          ) {
356            path = Bytes.toString(CellUtil.cloneValue(cell));
357          }
358        }
359        int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
360        if (srcIdx == -1) {
361          // the table is not among the query
362          continue;
363        }
364        if (mapForSrc[srcIdx] == null) {
365          mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
366        }
367        List<Path> files;
368        if (!mapForSrc[srcIdx].containsKey(fam)) {
369          files = new ArrayList<Path>();
370          mapForSrc[srcIdx].put(fam, files);
371        } else {
372          files = mapForSrc[srcIdx].get(fam);
373        }
374        files.add(new Path(path));
375        if (LOG.isDebugEnabled()) {
376          LOG.debug("found bulk loaded file : {} {} {}", tbl, Bytes.toString(fam), path);
377        }
378      }
379
380      return mapForSrc;
381    }
382  }
383
384  /**
385   * Deletes backup status from backup system table table
386   * @param backupId backup id
387   * @throws IOException exception
388   */
389  public void deleteBackupInfo(String backupId) throws IOException {
390    if (LOG.isTraceEnabled()) {
391      LOG.trace("delete backup status in backup system table for " + backupId);
392    }
393    try (Table table = connection.getTable(tableName)) {
394      Delete del = createDeleteForBackupInfo(backupId);
395      table.delete(del);
396    }
397  }
398
399  /**
400   * Registers a bulk load.
401   * @param tableName     table name
402   * @param region        the region receiving hfile
403   * @param cfToHfilePath column family and associated hfiles
404   */
405  public void registerBulkLoad(TableName tableName, byte[] region,
406    Map<byte[], List<Path>> cfToHfilePath) throws IOException {
407    if (LOG.isDebugEnabled()) {
408      LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
409        cfToHfilePath.size());
410    }
411    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
412      List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
413      bufferedMutator.mutate(puts);
414      LOG.debug("Written {} rows for bulk load of table {}", puts.size(), tableName);
415    }
416  }
417
418  /**
419   * Removes entries from the table that tracks all bulk loaded hfiles.
420   * @param rows the row keys of the entries to be deleted
421   */
422  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
423    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
424      List<Delete> deletes = new ArrayList<>();
425      for (byte[] row : rows) {
426        Delete del = new Delete(row);
427        deletes.add(del);
428        LOG.debug("Deleting bulk load entry with key: {}", Bytes.toString(row));
429      }
430      bufferedMutator.mutate(deletes);
431      LOG.debug("Deleted {} bulk load entries.", rows.size());
432    }
433  }
434
435  /**
436   * Reads the rows from backup table recording bulk loaded hfiles
437   * @param tableList list of table names
438   */
439  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
440    List<BulkLoad> result = new ArrayList<>();
441    for (TableName table : tableList) {
442      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
443      try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
444        ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
445        Result res;
446        while ((res = scanner.next()) != null) {
447          res.advance();
448          String fam = null;
449          String path = null;
450          String region = null;
451          byte[] row = null;
452          for (Cell cell : res.listCells()) {
453            row = CellUtil.cloneRow(cell);
454            String rowStr = Bytes.toString(row);
455            region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
456            if (
457              CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
458                BackupSystemTable.FAM_COL.length) == 0
459            ) {
460              fam = Bytes.toString(CellUtil.cloneValue(cell));
461            } else if (
462              CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
463                BackupSystemTable.PATH_COL.length) == 0
464            ) {
465              path = Bytes.toString(CellUtil.cloneValue(cell));
466            }
467          }
468          result.add(new BulkLoad(table, region, fam, path, row));
469          LOG.debug("found orig " + path + " for " + fam + " of table " + region);
470        }
471      }
472    }
473    return result;
474  }
475
476  /*
477   * @param sTableList List of tables
478   * @param maps array of Map of family to List of Paths
479   * @param backupId the backup Id
480   */
481  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
482    String backupId) throws IOException {
483    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
484      long ts = EnvironmentEdgeManager.currentTime();
485      int cnt = 0;
486      List<Put> puts = new ArrayList<>();
487      for (int idx = 0; idx < maps.length; idx++) {
488        Map<byte[], List<Path>> map = maps[idx];
489        TableName tn = sTableList.get(idx);
490
491        if (map == null) {
492          continue;
493        }
494
495        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
496          byte[] fam = entry.getKey();
497          List<Path> paths = entry.getValue();
498          for (Path p : paths) {
499            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId,
500              ts, cnt++);
501            puts.add(put);
502          }
503        }
504      }
505      if (!puts.isEmpty()) {
506        bufferedMutator.mutate(puts);
507      }
508    }
509  }
510
511  /**
512   * Reads backup status object (instance of backup info) from backup system table table
513   * @param backupId backup id
514   * @return Current status of backup session or null
515   */
516  public BackupInfo readBackupInfo(String backupId) throws IOException {
517    if (LOG.isTraceEnabled()) {
518      LOG.trace("read backup status from backup system table for: " + backupId);
519    }
520
521    try (Table table = connection.getTable(tableName)) {
522      Get get = createGetForBackupInfo(backupId);
523      Result res = table.get(get);
524      if (res.isEmpty()) {
525        return null;
526      }
527      return resultToBackupInfo(res);
528    }
529  }
530
531  /**
532   * Read the last backup start code (timestamp) of last successful backup. Will return null if
533   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
534   * there is no successful backup completed so far.
535   * @param backupRoot directory path to backup destination
536   * @return the timestamp of last successful backup
537   * @throws IOException exception
538   */
539  public String readBackupStartCode(String backupRoot) throws IOException {
540    LOG.trace("read backup start code from backup system table");
541
542    try (Table table = connection.getTable(tableName)) {
543      Get get = createGetForStartCode(backupRoot);
544      Result res = table.get(get);
545      if (res.isEmpty()) {
546        return null;
547      }
548      Cell cell = res.listCells().get(0);
549      byte[] val = CellUtil.cloneValue(cell);
550      if (val.length == 0) {
551        return null;
552      }
553      return new String(val, StandardCharsets.UTF_8);
554    }
555  }
556
557  /**
558   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
559   * @param startCode  start code
560   * @param backupRoot root directory path to backup
561   * @throws IOException exception
562   */
563  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
564    if (LOG.isTraceEnabled()) {
565      LOG.trace("write backup start code to backup system table " + startCode);
566    }
567    try (Table table = connection.getTable(tableName)) {
568      Put put = createPutForStartCode(startCode.toString(), backupRoot);
569      table.put(put);
570    }
571  }
572
573  /**
574   * Exclusive operations are: create, delete, merge
575   * @throws IOException if a table operation fails or an active backup exclusive operation is
576   *                     already underway
577   */
578  public void startBackupExclusiveOperation() throws IOException {
579    LOG.debug("Start new backup exclusive operation");
580
581    try (Table table = connection.getTable(tableName)) {
582      Put put = createPutForStartBackupSession();
583      // First try to put if row does not exist
584      if (
585        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
586          .ifNotExists().thenPut(put)
587      ) {
588        // Row exists, try to put if value == ACTIVE_SESSION_NO
589        if (
590          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
591            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
592        ) {
593          throw new ExclusiveOperationException();
594        }
595      }
596    }
597  }
598
599  private Put createPutForStartBackupSession() {
600    Put put = new Put(ACTIVE_SESSION_ROW);
601    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
602    return put;
603  }
604
605  public void finishBackupExclusiveOperation() throws IOException {
606    LOG.debug("Finish backup exclusive operation");
607
608    try (Table table = connection.getTable(tableName)) {
609      Put put = createPutForStopBackupSession();
610      if (
611        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
612          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
613      ) {
614        throw new IOException("There is no active backup exclusive operation");
615      }
616    }
617  }
618
619  private Put createPutForStopBackupSession() {
620    Put put = new Put(ACTIVE_SESSION_ROW);
621    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
622    return put;
623  }
624
625  /**
626   * Get the Region Servers log information after the last log roll from backup system table.
627   * @param backupRoot root directory path to backup
628   * @return RS log info
629   * @throws IOException exception
630   */
631  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
632    throws IOException {
633    LOG.trace("read region server last roll log result to backup system table");
634
635    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
636
637    try (Table table = connection.getTable(tableName);
638      ResultScanner scanner = table.getScanner(scan)) {
639      Result res;
640      HashMap<String, Long> rsTimestampMap = new HashMap<>();
641      while ((res = scanner.next()) != null) {
642        res.advance();
643        Cell cell = res.current();
644        byte[] row = CellUtil.cloneRow(cell);
645        String server = getServerNameForReadRegionServerLastLogRollResult(row);
646        byte[] data = CellUtil.cloneValue(cell);
647        rsTimestampMap.put(server, Bytes.toLong(data));
648      }
649      return rsTimestampMap;
650    }
651  }
652
653  /**
654   * Writes Region Server last roll log result (timestamp) to backup system table table
655   * @param server     Region Server name
656   * @param ts         last log timestamp
657   * @param backupRoot root directory path to backup
658   * @throws IOException exception
659   */
660  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
661    throws IOException {
662    LOG.trace("write region server last roll log result to backup system table");
663
664    try (Table table = connection.getTable(tableName)) {
665      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
666      table.put(put);
667    }
668  }
669
670  /**
671   * Get all completed backup information (in desc order by time)
672   * @param onlyCompleted true, if only successfully completed sessions
673   * @return history info of BackupCompleteData
674   * @throws IOException exception
675   */
676  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
677    LOG.trace("get backup history from backup system table");
678
679    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
680    ArrayList<BackupInfo> list = getBackupInfos(state);
681    return BackupUtils.sortHistoryListDesc(list);
682  }
683
684  /**
685   * Get all backups history
686   * @return list of backup info
687   * @throws IOException if getting the backup history fails
688   */
689  public List<BackupInfo> getBackupHistory() throws IOException {
690    return getBackupHistory(false);
691  }
692
693  /**
694   * Get first n backup history records
695   * @param n number of records, if n== -1 - max number is ignored
696   * @return list of records
697   * @throws IOException if getting the backup history fails
698   */
699  public List<BackupInfo> getHistory(int n) throws IOException {
700    List<BackupInfo> history = getBackupHistory();
701    if (n == -1 || history.size() <= n) {
702      return history;
703    }
704    return Collections.unmodifiableList(history.subList(0, n));
705  }
706
707  /**
708   * Get backup history records filtered by list of filters.
709   * @param n       max number of records, if n == -1 , then max number is ignored
710   * @param filters list of filters
711   * @return backup records
712   * @throws IOException if getting the backup history fails
713   */
714  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
715    if (filters.length == 0) {
716      return getHistory(n);
717    }
718
719    List<BackupInfo> history = getBackupHistory();
720    List<BackupInfo> result = new ArrayList<>();
721    for (BackupInfo bi : history) {
722      if (n >= 0 && result.size() == n) {
723        break;
724      }
725
726      boolean passed = true;
727      for (int i = 0; i < filters.length; i++) {
728        if (!filters[i].apply(bi)) {
729          passed = false;
730          break;
731        }
732      }
733      if (passed) {
734        result.add(bi);
735      }
736    }
737    return result;
738  }
739
740  /**
741   * Retrieve all table names that are part of any known backup
742   */
743  public Set<TableName> getTablesIncludedInBackups() throws IOException {
744    Set<TableName> names = new HashSet<>();
745    List<BackupInfo> infos = getBackupHistory(true);
746    for (BackupInfo info : infos) {
747      // Incremental backups have the same tables as the preceding full backups
748      if (info.getType() == BackupType.FULL) {
749        names.addAll(info.getTableNames());
750      }
751    }
752    return names;
753  }
754
755  /**
756   * Get history for backup destination
757   * @param backupRoot backup destination path
758   * @return List of backup info
759   * @throws IOException if getting the backup history fails
760   */
761  public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
762    ArrayList<BackupInfo> history = getBackupHistory(false);
763    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
764      BackupInfo info = iterator.next();
765      if (!backupRoot.equals(info.getBackupRootDir())) {
766        iterator.remove();
767      }
768    }
769    return history;
770  }
771
772  /**
773   * Get history for a table
774   * @param name table name
775   * @return history for a table
776   * @throws IOException if getting the backup history fails
777   */
778  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
779    List<BackupInfo> history = getBackupHistory();
780    List<BackupInfo> tableHistory = new ArrayList<>();
781    for (BackupInfo info : history) {
782      List<TableName> tables = info.getTableNames();
783      if (tables.contains(name)) {
784        tableHistory.add(info);
785      }
786    }
787    return tableHistory;
788  }
789
790  /**
791   * Goes through all backup history corresponding to the provided root folder, and collects all
792   * backup info mentioning each of the provided tables.
793   * @param set        the tables for which to collect the {@code BackupInfo}
794   * @param backupRoot backup destination path to retrieve backup history for
795   * @return a map containing (a subset of) the provided {@code TableName}s, mapped to a list of at
796   *         least one {@code BackupInfo}
797   * @throws IOException if getting the backup history fails
798   */
799  public Map<TableName, List<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
800    String backupRoot) throws IOException {
801    List<BackupInfo> history = getBackupHistory(backupRoot);
802    Map<TableName, List<BackupInfo>> tableHistoryMap = new HashMap<>();
803    for (BackupInfo info : history) {
804      List<TableName> tables = info.getTableNames();
805      for (TableName tableName : tables) {
806        if (set.contains(tableName)) {
807          List<BackupInfo> list =
808            tableHistoryMap.computeIfAbsent(tableName, k -> new ArrayList<>());
809          list.add(info);
810        }
811      }
812    }
813    return tableHistoryMap;
814  }
815
816  /**
817   * Get all backup sessions with a given state (in descending order by time)
818   * @param state backup session state
819   * @return history info of backup info objects
820   * @throws IOException exception
821   */
822  public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
823    LOG.trace("get backup infos from backup system table");
824
825    Scan scan = createScanForBackupHistory();
826    ArrayList<BackupInfo> list = new ArrayList<>();
827
828    try (Table table = connection.getTable(tableName);
829      ResultScanner scanner = table.getScanner(scan)) {
830      Result res;
831      while ((res = scanner.next()) != null) {
832        res.advance();
833        BackupInfo context = cellToBackupInfo(res.current());
834        if (state != BackupState.ANY && context.getState() != state) {
835          continue;
836        }
837        list.add(context);
838      }
839      return list;
840    }
841  }
842
843  /**
844   * Write the current timestamps for each regionserver to backup system table after a successful
845   * full or incremental backup. The saved timestamp is of the last log file that was backed up
846   * already.
847   * @param tables        tables
848   * @param newTimestamps timestamps
849   * @param backupRoot    root directory path to backup
850   * @throws IOException exception
851   */
852  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
853    String backupRoot) throws IOException {
854    if (LOG.isTraceEnabled()) {
855      LOG.trace("write RS log time stamps to backup system table for tables ["
856        + StringUtils.join(tables, ",") + "]");
857    }
858    List<Put> puts = new ArrayList<>();
859    for (TableName table : tables) {
860      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
861      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
862      puts.add(put);
863    }
864    try (BufferedMutator bufferedMutator = connection.getBufferedMutator(tableName)) {
865      bufferedMutator.mutate(puts);
866    }
867  }
868
869  /**
870   * Read the timestamp for each region server log after the last successful backup. Each table has
871   * its own set of the timestamps. The info is stored for each table as a concatenated string of
872   * rs->timestapmp
873   * @param backupRoot root directory path to backup
874   * @return the timestamp for each region server. key: tableName value:
875   *         RegionServer,PreviousTimeStamp
876   * @throws IOException exception
877   */
878  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
879    throws IOException {
880    if (LOG.isTraceEnabled()) {
881      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
882    }
883
884    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
885
886    Scan scan = createScanForReadLogTimestampMap(backupRoot);
887    try (Table table = connection.getTable(tableName);
888      ResultScanner scanner = table.getScanner(scan)) {
889      Result res;
890      while ((res = scanner.next()) != null) {
891        res.advance();
892        Cell cell = res.current();
893        byte[] row = CellUtil.cloneRow(cell);
894        String tabName = getTableNameForReadLogTimestampMap(row);
895        TableName tn = TableName.valueOf(tabName);
896        byte[] data = CellUtil.cloneValue(cell);
897        if (data == null) {
898          throw new IOException("Data of last backup data from backup system table "
899            + "is empty. Create a backup first.");
900        }
901        if (data != null && data.length > 0) {
902          HashMap<String, Long> lastBackup =
903            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
904          tableTimestampMap.put(tn, lastBackup);
905        }
906      }
907      return tableTimestampMap;
908    }
909  }
910
911  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
912    Map<String, Long> map) {
913    BackupProtos.TableServerTimestamp.Builder tstBuilder =
914      BackupProtos.TableServerTimestamp.newBuilder();
915    tstBuilder
916      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
917
918    for (Entry<String, Long> entry : map.entrySet()) {
919      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
920      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
921      ServerName sn = ServerName.parseServerName(entry.getKey());
922      snBuilder.setHostName(sn.getHostname());
923      snBuilder.setPort(sn.getPort());
924      builder.setServerName(snBuilder.build());
925      builder.setTimestamp(entry.getValue());
926      tstBuilder.addServerTimestamp(builder.build());
927    }
928
929    return tstBuilder.build();
930  }
931
932  private HashMap<String, Long>
933    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
934
935    HashMap<String, Long> map = new HashMap<>();
936    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
937    for (BackupProtos.ServerTimestamp st : list) {
938      ServerName sn =
939        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
940      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
941    }
942    return map;
943  }
944
945  /**
946   * Return the current tables covered by incremental backup.
947   * @param backupRoot root directory path to backup
948   * @return set of tableNames
949   * @throws IOException exception
950   */
951  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
952    LOG.trace("get incremental backup table set from backup system table");
953
954    TreeSet<TableName> set = new TreeSet<>();
955
956    try (Table table = connection.getTable(tableName)) {
957      Get get = createGetForIncrBackupTableSet(backupRoot);
958      Result res = table.get(get);
959      if (res.isEmpty()) {
960        return set;
961      }
962      List<Cell> cells = res.listCells();
963      for (Cell cell : cells) {
964        // qualifier = table name - we use table names as qualifiers
965        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
966      }
967      return set;
968    }
969  }
970
971  /**
972   * Add tables to global incremental backup set
973   * @param tables     set of tables
974   * @param backupRoot root directory path to backup
975   * @throws IOException exception
976   */
977  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
978    throws IOException {
979    if (LOG.isTraceEnabled()) {
980      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
981        + " tables [" + StringUtils.join(tables, " ") + "]");
982    }
983    if (LOG.isDebugEnabled()) {
984      tables.forEach(table -> LOG.debug(Objects.toString(table)));
985    }
986    try (Table table = connection.getTable(tableName)) {
987      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
988      table.put(put);
989    }
990  }
991
992  /**
993   * Deletes incremental backup set for a backup destination
994   * @param backupRoot backup root
995   */
996  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
997    if (LOG.isTraceEnabled()) {
998      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
999    }
1000    try (Table table = connection.getTable(tableName)) {
1001      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
1002      table.delete(delete);
1003    }
1004  }
1005
1006  /**
1007   * Checks if we have at least one backup session in backup system table This API is used by
1008   * BackupLogCleaner
1009   * @return true, if - at least one session exists in backup system table table
1010   * @throws IOException exception
1011   */
1012  public boolean hasBackupSessions() throws IOException {
1013    LOG.trace("Has backup sessions from backup system table");
1014
1015    boolean result = false;
1016    Scan scan = createScanForBackupHistory();
1017    scan.setCaching(1);
1018    try (Table table = connection.getTable(tableName);
1019      ResultScanner scanner = table.getScanner(scan)) {
1020      if (scanner.next() != null) {
1021        result = true;
1022      }
1023      return result;
1024    }
1025  }
1026
1027  /**
1028   * BACKUP SETS
1029   */
1030
1031  /**
1032   * Get backup set list
1033   * @return backup set list
1034   * @throws IOException if a table or scanner operation fails
1035   */
1036  public List<String> listBackupSets() throws IOException {
1037    LOG.trace("Backup set list");
1038
1039    List<String> list = new ArrayList<>();
1040    try (Table table = connection.getTable(tableName)) {
1041      Scan scan = createScanForBackupSetList();
1042      scan.readVersions(1);
1043      try (ResultScanner scanner = table.getScanner(scan)) {
1044        Result res;
1045        while ((res = scanner.next()) != null) {
1046          res.advance();
1047          list.add(cellKeyToBackupSetName(res.current()));
1048        }
1049        return list;
1050      }
1051    }
1052  }
1053
1054  /**
1055   * Get backup set description (list of tables)
1056   * @param name set's name
1057   * @return list of tables in a backup set
1058   * @throws IOException if a table operation fails
1059   */
1060  public List<TableName> describeBackupSet(String name) throws IOException {
1061    if (LOG.isTraceEnabled()) {
1062      LOG.trace(" Backup set describe: " + name);
1063    }
1064    try (Table table = connection.getTable(tableName)) {
1065      Get get = createGetForBackupSet(name);
1066      Result res = table.get(get);
1067      if (res.isEmpty()) {
1068        return null;
1069      }
1070      res.advance();
1071      String[] tables = cellValueToBackupSet(res.current());
1072      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
1073        .collect(Collectors.toList());
1074    }
1075  }
1076
1077  /**
1078   * Add backup set (list of tables)
1079   * @param name      set name
1080   * @param newTables list of tables, comma-separated
1081   * @throws IOException if a table operation fails
1082   */
1083  public void addToBackupSet(String name, String[] newTables) throws IOException {
1084    if (LOG.isTraceEnabled()) {
1085      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
1086    }
1087    String[] union = null;
1088    try (Table table = connection.getTable(tableName)) {
1089      Get get = createGetForBackupSet(name);
1090      Result res = table.get(get);
1091      if (res.isEmpty()) {
1092        union = newTables;
1093      } else {
1094        res.advance();
1095        String[] tables = cellValueToBackupSet(res.current());
1096        union = merge(tables, newTables);
1097      }
1098      Put put = createPutForBackupSet(name, union);
1099      table.put(put);
1100    }
1101  }
1102
1103  /**
1104   * Remove tables from backup set (list of tables)
1105   * @param name     set name
1106   * @param toRemove list of tables
1107   * @throws IOException if a table operation or deleting the backup set fails
1108   */
1109  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
1110    if (LOG.isTraceEnabled()) {
1111      LOG.trace(
1112        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
1113    }
1114    String[] disjoint;
1115    String[] tables;
1116    try (Table table = connection.getTable(tableName)) {
1117      Get get = createGetForBackupSet(name);
1118      Result res = table.get(get);
1119      if (res.isEmpty()) {
1120        LOG.warn("Backup set '" + name + "' not found.");
1121        return;
1122      } else {
1123        res.advance();
1124        tables = cellValueToBackupSet(res.current());
1125        disjoint = disjoin(tables, toRemove);
1126      }
1127      if (disjoint.length > 0 && disjoint.length != tables.length) {
1128        Put put = createPutForBackupSet(name, disjoint);
1129        table.put(put);
1130      } else if (disjoint.length == tables.length) {
1131        LOG.warn("Backup set '" + name + "' does not contain tables ["
1132          + StringUtils.join(toRemove, " ") + "]");
1133      } else { // disjoint.length == 0 and tables.length >0
1134        // Delete backup set
1135        LOG.info("Backup set '" + name + "' is empty. Deleting.");
1136        deleteBackupSet(name);
1137      }
1138    }
1139  }
1140
1141  private String[] merge(String[] existingTables, String[] newTables) {
1142    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1143    tables.addAll(Arrays.asList(newTables));
1144    return tables.toArray(new String[0]);
1145  }
1146
1147  private String[] disjoin(String[] existingTables, String[] toRemove) {
1148    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1149    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
1150    return tables.toArray(new String[0]);
1151  }
1152
1153  /**
1154   * Delete backup set
1155   * @param name set's name
1156   * @throws IOException if getting or deleting the table fails
1157   */
1158  public void deleteBackupSet(String name) throws IOException {
1159    if (LOG.isTraceEnabled()) {
1160      LOG.trace(" Backup set delete: " + name);
1161    }
1162    try (Table table = connection.getTable(tableName)) {
1163      Delete del = createDeleteForBackupSet(name);
1164      table.delete(del);
1165    }
1166  }
1167
1168  /**
1169   * Get backup system table descriptor
1170   * @return table's descriptor
1171   */
1172  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1173    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1174
1175    ColumnFamilyDescriptorBuilder colBuilder =
1176      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1177
1178    colBuilder.setMaxVersions(1);
1179    Configuration config = HBaseConfiguration.create();
1180    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1181      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1182    colBuilder.setTimeToLive(ttl);
1183
1184    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1185    builder.setColumnFamily(colSessionsDesc);
1186
1187    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1188    colBuilder.setTimeToLive(ttl);
1189    builder.setColumnFamily(colBuilder.build());
1190    return builder.build();
1191  }
1192
1193  public static TableName getTableName(Configuration conf) {
1194    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1195      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1196    return TableName.valueOf(name);
1197  }
1198
1199  public static String getTableNameAsString(Configuration conf) {
1200    return getTableName(conf).getNameAsString();
1201  }
1202
1203  public static String getSnapshotName(Configuration conf) {
1204    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1205  }
1206
1207  /**
1208   * Get backup system table descriptor
1209   * @return table's descriptor
1210   */
1211  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1212    TableDescriptorBuilder builder =
1213      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1214
1215    ColumnFamilyDescriptorBuilder colBuilder =
1216      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1217    colBuilder.setMaxVersions(1);
1218    Configuration config = HBaseConfiguration.create();
1219    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1220      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1221    colBuilder.setTimeToLive(ttl);
1222    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1223    builder.setColumnFamily(colSessionsDesc);
1224    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1225    colBuilder.setTimeToLive(ttl);
1226    builder.setColumnFamily(colBuilder.build());
1227    return builder.build();
1228  }
1229
1230  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1231    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1232      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1233    return TableName.valueOf(name);
1234  }
1235
1236  /**
1237   * Creates Put operation for a given backup info object
1238   * @param context backup info
1239   * @return put operation
1240   * @throws IOException exception
1241   */
1242  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1243    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1244    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1245      context.toByteArray());
1246    return put;
1247  }
1248
1249  /**
1250   * Creates Get operation for a given backup id
1251   * @param backupId backup's ID
1252   * @return get operation
1253   * @throws IOException exception
1254   */
1255  private Get createGetForBackupInfo(String backupId) throws IOException {
1256    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1257    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1258    get.readVersions(1);
1259    return get;
1260  }
1261
1262  /**
1263   * Creates Delete operation for a given backup id
1264   * @param backupId backup's ID
1265   * @return delete operation
1266   */
1267  private Delete createDeleteForBackupInfo(String backupId) {
1268    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1269    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1270    return del;
1271  }
1272
1273  /**
1274   * Converts Result to BackupInfo
1275   * @param res HBase result
1276   * @return backup info instance
1277   * @throws IOException exception
1278   */
1279  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1280    res.advance();
1281    Cell cell = res.current();
1282    return cellToBackupInfo(cell);
1283  }
1284
1285  /**
1286   * Creates Get operation to retrieve start code from backup system table
1287   * @return get operation
1288   * @throws IOException exception
1289   */
1290  private Get createGetForStartCode(String rootPath) throws IOException {
1291    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1292    get.addFamily(BackupSystemTable.META_FAMILY);
1293    get.readVersions(1);
1294    return get;
1295  }
1296
1297  /**
1298   * Creates Put operation to store start code to backup system table
1299   * @return put operation
1300   */
1301  private Put createPutForStartCode(String startCode, String rootPath) {
1302    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1303    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1304      Bytes.toBytes(startCode));
1305    return put;
1306  }
1307
1308  /**
1309   * Creates Get to retrieve incremental backup table set from backup system table
1310   * @return get operation
1311   * @throws IOException exception
1312   */
1313  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1314    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1315    get.addFamily(BackupSystemTable.META_FAMILY);
1316    get.readVersions(1);
1317    return get;
1318  }
1319
1320  /**
1321   * Creates Put to store incremental backup table set
1322   * @param tables tables
1323   * @return put operation
1324   */
1325  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1326    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1327    for (TableName table : tables) {
1328      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1329        EMPTY_VALUE);
1330    }
1331    return put;
1332  }
1333
1334  /**
1335   * Creates Delete for incremental backup table set
1336   * @param backupRoot backup root
1337   * @return delete operation
1338   */
1339  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1340    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1341    delete.addFamily(BackupSystemTable.META_FAMILY);
1342    return delete;
1343  }
1344
1345  /**
1346   * Creates Scan operation to load backup history
1347   * @return scan operation
1348   */
1349  private Scan createScanForBackupHistory() {
1350    Scan scan = new Scan();
1351    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1352    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1353    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1354    scan.withStartRow(startRow);
1355    scan.withStopRow(stopRow);
1356    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1357    scan.readVersions(1);
1358    return scan;
1359  }
1360
1361  /**
1362   * Converts cell to backup info instance.
1363   * @param current current cell
1364   * @return backup backup info instance
1365   * @throws IOException exception
1366   */
1367  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1368    byte[] data = CellUtil.cloneValue(current);
1369    return BackupInfo.fromByteArray(data);
1370  }
1371
1372  /**
1373   * Creates Put to write RS last roll log timestamp map
1374   * @param table table
1375   * @param smap  map, containing RS:ts
1376   * @return put operation
1377   */
1378  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1379    String backupRoot) {
1380    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1381    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1382    return put;
1383  }
1384
1385  /**
1386   * Creates Scan to load table-> { RS -> ts} map of maps
1387   * @return scan operation
1388   */
1389  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1390    Scan scan = new Scan();
1391    scan.setStartStopRowForPrefixScan(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL));
1392    scan.addFamily(BackupSystemTable.META_FAMILY);
1393
1394    return scan;
1395  }
1396
1397  /**
1398   * Get table name from rowkey
1399   * @param cloneRow rowkey
1400   * @return table name
1401   */
1402  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1403    String s = Bytes.toString(cloneRow);
1404    int index = s.lastIndexOf(NULL);
1405    return s.substring(index + 1);
1406  }
1407
1408  /**
1409   * Creates Put to store RS last log result
1410   * @param server    server name
1411   * @param timestamp log roll result (timestamp)
1412   * @return put operation
1413   */
1414  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1415    String backupRoot) {
1416    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1417    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1418      Bytes.toBytes(timestamp));
1419    return put;
1420  }
1421
1422  /**
1423   * Creates Scan operation to load last RS log roll results
1424   * @return scan operation
1425   */
1426  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1427    Scan scan = new Scan();
1428    scan.setStartStopRowForPrefixScan(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL));
1429    scan.addFamily(BackupSystemTable.META_FAMILY);
1430    scan.readVersions(1);
1431
1432    return scan;
1433  }
1434
1435  /**
1436   * Get server's name from rowkey
1437   * @param row rowkey
1438   * @return server's name
1439   */
1440  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1441    String s = Bytes.toString(row);
1442    int index = s.lastIndexOf(NULL);
1443    return s.substring(index + 1);
1444  }
1445
1446  /**
1447   * Creates Put's for bulk loads.
1448   */
1449  private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
1450    Map<byte[], List<Path>> columnFamilyToHFilePaths) {
1451    List<Put> puts = new ArrayList<>();
1452    for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
1453      for (Path path : entry.getValue()) {
1454        String file = path.toString();
1455        int lastSlash = file.lastIndexOf("/");
1456        String filename = file.substring(lastSlash + 1);
1457        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1458          Bytes.toString(region), BLK_LD_DELIM, filename));
1459        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1460        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1461        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1462        puts.add(put);
1463        LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
1464      }
1465    }
1466    return puts;
1467  }
1468
1469  public static void snapshot(Connection conn) throws IOException {
1470    try (Admin admin = conn.getAdmin()) {
1471      Configuration conf = conn.getConfiguration();
1472      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1473    }
1474  }
1475
1476  public static void restoreFromSnapshot(Connection conn) throws IOException {
1477    Configuration conf = conn.getConfiguration();
1478    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1479    try (Admin admin = conn.getAdmin()) {
1480      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1481      if (snapshotExists(admin, snapshotName)) {
1482        admin.disableTable(BackupSystemTable.getTableName(conf));
1483        admin.restoreSnapshot(snapshotName);
1484        admin.enableTable(BackupSystemTable.getTableName(conf));
1485        LOG.debug("Done restoring backup system table");
1486      } else {
1487        // Snapshot does not exists, i.e completeBackup failed after
1488        // deleting backup system table snapshot
1489        // In this case we log WARN and proceed
1490        LOG.warn(
1491          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1492      }
1493    }
1494  }
1495
1496  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1497    List<SnapshotDescription> list = admin.listSnapshots();
1498    for (SnapshotDescription desc : list) {
1499      if (desc.getName().equals(snapshotName)) {
1500        return true;
1501      }
1502    }
1503    return false;
1504  }
1505
1506  public static boolean snapshotExists(Connection conn) throws IOException {
1507    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1508  }
1509
1510  public static void deleteSnapshot(Connection conn) throws IOException {
1511    Configuration conf = conn.getConfiguration();
1512    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1513    try (Admin admin = conn.getAdmin()) {
1514      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1515      if (snapshotExists(admin, snapshotName)) {
1516        admin.deleteSnapshot(snapshotName);
1517        LOG.debug("Done deleting backup system table snapshot");
1518      } else {
1519        LOG.error("Snapshot " + snapshotName + " does not exists");
1520      }
1521    }
1522  }
1523
1524  private Put createPutForDeleteOperation(String[] backupIdList) {
1525    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1526    Put put = new Put(DELETE_OP_ROW);
1527    put.addColumn(META_FAMILY, FAM_COL, value);
1528    return put;
1529  }
1530
1531  private Delete createDeleteForBackupDeleteOperation() {
1532    Delete delete = new Delete(DELETE_OP_ROW);
1533    delete.addFamily(META_FAMILY);
1534    return delete;
1535  }
1536
1537  private Get createGetForDeleteOperation() {
1538    Get get = new Get(DELETE_OP_ROW);
1539    get.addFamily(META_FAMILY);
1540    return get;
1541  }
1542
1543  public void startDeleteOperation(String[] backupIdList) throws IOException {
1544    if (LOG.isTraceEnabled()) {
1545      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1546    }
1547    Put put = createPutForDeleteOperation(backupIdList);
1548    try (Table table = connection.getTable(tableName)) {
1549      table.put(put);
1550    }
1551  }
1552
1553  public void finishDeleteOperation() throws IOException {
1554    LOG.trace("Finsih delete operation for backup ids");
1555
1556    Delete delete = createDeleteForBackupDeleteOperation();
1557    try (Table table = connection.getTable(tableName)) {
1558      table.delete(delete);
1559    }
1560  }
1561
1562  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1563    LOG.trace("Get delete operation for backup ids");
1564
1565    Get get = createGetForDeleteOperation();
1566    try (Table table = connection.getTable(tableName)) {
1567      Result res = table.get(get);
1568      if (res.isEmpty()) {
1569        return null;
1570      }
1571      Cell cell = res.listCells().get(0);
1572      byte[] val = CellUtil.cloneValue(cell);
1573      if (val.length == 0) {
1574        return null;
1575      }
1576      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1577        .toArray(String[]::new);
1578    }
1579  }
1580
1581  private Put createPutForMergeOperation(String[] backupIdList) {
1582    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1583    Put put = new Put(MERGE_OP_ROW);
1584    put.addColumn(META_FAMILY, FAM_COL, value);
1585    return put;
1586  }
1587
1588  public boolean isMergeInProgress() throws IOException {
1589    Get get = new Get(MERGE_OP_ROW);
1590    try (Table table = connection.getTable(tableName)) {
1591      Result res = table.get(get);
1592      return !res.isEmpty();
1593    }
1594  }
1595
1596  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1597    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1598    Put put = new Put(MERGE_OP_ROW);
1599    put.addColumn(META_FAMILY, PATH_COL, value);
1600    return put;
1601  }
1602
1603  private Delete createDeleteForBackupMergeOperation() {
1604    Delete delete = new Delete(MERGE_OP_ROW);
1605    delete.addFamily(META_FAMILY);
1606    return delete;
1607  }
1608
1609  private Get createGetForMergeOperation() {
1610    Get get = new Get(MERGE_OP_ROW);
1611    get.addFamily(META_FAMILY);
1612    return get;
1613  }
1614
1615  public void startMergeOperation(String[] backupIdList) throws IOException {
1616    if (LOG.isTraceEnabled()) {
1617      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1618    }
1619    Put put = createPutForMergeOperation(backupIdList);
1620    try (Table table = connection.getTable(tableName)) {
1621      table.put(put);
1622    }
1623  }
1624
1625  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1626    if (LOG.isTraceEnabled()) {
1627      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1628    }
1629    Put put = createPutForUpdateTablesForMerge(tables);
1630    try (Table table = connection.getTable(tableName)) {
1631      table.put(put);
1632    }
1633  }
1634
1635  public void finishMergeOperation() throws IOException {
1636    LOG.trace("Finish merge operation for backup ids");
1637
1638    Delete delete = createDeleteForBackupMergeOperation();
1639    try (Table table = connection.getTable(tableName)) {
1640      table.delete(delete);
1641    }
1642  }
1643
1644  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1645    LOG.trace("Get backup ids for merge operation");
1646
1647    Get get = createGetForMergeOperation();
1648    try (Table table = connection.getTable(tableName)) {
1649      Result res = table.get(get);
1650      if (res.isEmpty()) {
1651        return null;
1652      }
1653      Cell cell = res.listCells().get(0);
1654      byte[] val = CellUtil.cloneValue(cell);
1655      if (val.length == 0) {
1656        return null;
1657      }
1658      return Splitter.on(',').splitToStream(new String(val, StandardCharsets.UTF_8))
1659        .toArray(String[]::new);
1660    }
1661  }
1662
1663  static Scan createScanForOrigBulkLoadedFiles(TableName table) {
1664    Scan scan = new Scan();
1665    byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1666    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1667    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1668    scan.withStartRow(startRow);
1669    scan.withStopRow(stopRow);
1670    scan.addFamily(BackupSystemTable.META_FAMILY);
1671    scan.readVersions(1);
1672    return scan;
1673  }
1674
1675  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1676    // format is bulk : namespace : table : region : file
1677    return Iterators.get(Splitter.onPattern(BLK_LD_DELIM).split(rowStr).iterator(), 1);
1678  }
1679
1680  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1681    // format is bulk : namespace : table : region : file
1682    List<String> parts = Splitter.onPattern(BLK_LD_DELIM).splitToList(rowStr);
1683    Iterator<String> i = parts.iterator();
1684    int idx = 3;
1685    if (parts.size() == 4) {
1686      // the table is in default namespace
1687      idx = 2;
1688    }
1689    String region = Iterators.get(i, idx);
1690    LOG.debug("bulk row string " + rowStr + " region " + region);
1691    return region;
1692  }
1693
1694  /*
1695   * Used to query bulk loaded hfiles which have been copied by incremental backup
1696   * @param backupId the backup Id. It can be null when querying for all tables
1697   * @return the Scan object
1698   */
1699  static Scan createScanForBulkLoadedFiles(String backupId) {
1700    Scan scan = new Scan();
1701    byte[] startRow =
1702      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1703    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1704    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1705    scan.withStartRow(startRow);
1706    scan.withStopRow(stopRow);
1707    scan.addFamily(BackupSystemTable.META_FAMILY);
1708    scan.readVersions(1);
1709    return scan;
1710  }
1711
1712  static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
1713    long ts, int idx) {
1714    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
1715    put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
1716    put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
1717    put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p));
1718    return put;
1719  }
1720
1721  /**
1722   * Creates Scan operation to load backup set list
1723   * @return scan operation
1724   */
1725  private Scan createScanForBackupSetList() {
1726    Scan scan = new Scan();
1727    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1728    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1729    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1730    scan.withStartRow(startRow);
1731    scan.withStopRow(stopRow);
1732    scan.addFamily(BackupSystemTable.META_FAMILY);
1733    return scan;
1734  }
1735
1736  /**
1737   * Creates Get operation to load backup set content
1738   * @return get operation
1739   */
1740  private Get createGetForBackupSet(String name) {
1741    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1742    get.addFamily(BackupSystemTable.META_FAMILY);
1743    return get;
1744  }
1745
1746  /**
1747   * Creates Delete operation to delete backup set content
1748   * @param name backup set's name
1749   * @return delete operation
1750   */
1751  private Delete createDeleteForBackupSet(String name) {
1752    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1753    del.addFamily(BackupSystemTable.META_FAMILY);
1754    return del;
1755  }
1756
1757  /**
1758   * Creates Put operation to update backup set content
1759   * @param name   backup set's name
1760   * @param tables list of tables
1761   * @return put operation
1762   */
1763  private Put createPutForBackupSet(String name, String[] tables) {
1764    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1765    byte[] value = convertToByteArray(tables);
1766    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1767    return put;
1768  }
1769
1770  private byte[] convertToByteArray(String[] tables) {
1771    return Bytes.toBytes(StringUtils.join(tables, ","));
1772  }
1773
1774  /**
1775   * Converts cell to backup set list.
1776   * @param current current cell
1777   * @return backup set as array of table names
1778   */
1779  private String[] cellValueToBackupSet(Cell current) {
1780    byte[] data = CellUtil.cloneValue(current);
1781    if (!ArrayUtils.isEmpty(data)) {
1782      return Bytes.toString(data).split(",");
1783    }
1784    return new String[0];
1785  }
1786
1787  /**
1788   * Converts cell key to backup set name.
1789   * @param current current cell
1790   * @return backup set name
1791   */
1792  private String cellKeyToBackupSetName(Cell current) {
1793    byte[] data = CellUtil.cloneRow(current);
1794    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1795  }
1796
1797  private static byte[] rowkey(String s, String... other) {
1798    StringBuilder sb = new StringBuilder(s);
1799    for (String ss : other) {
1800      sb.append(ss);
1801    }
1802    return Bytes.toBytes(sb.toString());
1803  }
1804
1805  private static void ensureTableEnabled(Admin admin, TableName tableName) throws IOException {
1806    if (!admin.isTableEnabled(tableName)) {
1807      try {
1808        admin.enableTable(tableName);
1809      } catch (TableNotDisabledException ignored) {
1810        LOG.info("Table {} is not disabled, ignoring enable request", tableName);
1811      }
1812    }
1813  }
1814}