001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.impl;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031import java.util.Objects;
032import java.util.Set;
033import java.util.TreeMap;
034import java.util.TreeSet;
035import java.util.stream.Collectors;
036import org.apache.commons.lang3.ArrayUtils;
037import org.apache.commons.lang3.StringUtils;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.backup.BackupInfo;
047import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
048import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
049import org.apache.hadoop.hbase.backup.BackupType;
050import org.apache.hadoop.hbase.backup.util.BackupUtils;
051import org.apache.hadoop.hbase.client.Admin;
052import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.Connection;
055import org.apache.hadoop.hbase.client.Delete;
056import org.apache.hadoop.hbase.client.Get;
057import org.apache.hadoop.hbase.client.Put;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.SnapshotDescription;
062import org.apache.hadoop.hbase.client.Table;
063import org.apache.hadoop.hbase.client.TableDescriptor;
064import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.Pair;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
074
075/**
076 * This class provides API to access backup system table<br>
077 * Backup system table schema:<br>
078 * <p>
079 * <ul>
080 * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
081 * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
082 * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li>
083 * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
084 * timestamp]</li>
085 * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
086 * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
087 * name</li>
088 * </ul>
089 * </p>
090 */
091@InterfaceAudience.Private
092public final class BackupSystemTable implements Closeable {
093
094  private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
095
096  static class WALItem {
097    String backupId;
098    String walFile;
099    String backupRoot;
100
101    WALItem(String backupId, String walFile, String backupRoot) {
102      this.backupId = backupId;
103      this.walFile = walFile;
104      this.backupRoot = backupRoot;
105    }
106
107    public String getBackupId() {
108      return backupId;
109    }
110
111    public String getWalFile() {
112      return walFile;
113    }
114
115    public String getBackupRoot() {
116      return backupRoot;
117    }
118
119    @Override
120    public String toString() {
121      return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile;
122    }
123  }
124
125  /**
126   * Backup system table (main) name
127   */
128  private TableName tableName;
129
130  /**
131   * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
132   * separate table because we have to isolate general backup operations: create, merge etc from
133   * activity of RegionObserver, which controls process of a bulk loading
134   * {@link org.apache.hadoop.hbase.backup.BackupObserver}
135   */
136  private TableName bulkLoadTableName;
137
138  /**
139   * Stores backup sessions (contexts)
140   */
141  final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session");
142  /**
143   * Stores other meta
144   */
145  final static byte[] META_FAMILY = Bytes.toBytes("meta");
146  final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk");
147  /**
148   * Connection to HBase cluster, shared among all instances
149   */
150  private final Connection connection;
151
152  private final static String BACKUP_INFO_PREFIX = "session:";
153  private final static String START_CODE_ROW = "startcode:";
154  private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:");
155  private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c");
156
157  private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes");
158  private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
159
160  private final static String INCR_BACKUP_SET = "incrbackupset:";
161  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
162  private final static String RS_LOG_TS_PREFIX = "rslogts:";
163
164  private final static String BULK_LOAD_PREFIX = "bulk:";
165  private final static byte[] BULK_LOAD_PREFIX_BYTES = Bytes.toBytes(BULK_LOAD_PREFIX);
166  private final static byte[] DELETE_OP_ROW = Bytes.toBytes("delete_op_row");
167  private final static byte[] MERGE_OP_ROW = Bytes.toBytes("merge_op_row");
168
169  final static byte[] TBL_COL = Bytes.toBytes("tbl");
170  final static byte[] FAM_COL = Bytes.toBytes("fam");
171  final static byte[] PATH_COL = Bytes.toBytes("path");
172  final static byte[] STATE_COL = Bytes.toBytes("state");
173  // the two states a bulk loaded file can be
174  final static byte[] BL_PREPARE = Bytes.toBytes("R");
175  final static byte[] BL_COMMIT = Bytes.toBytes("D");
176
177  private final static String SET_KEY_PREFIX = "backupset:";
178
179  // separator between BULK_LOAD_PREFIX and ordinals
180  private final static String BLK_LD_DELIM = ":";
181  private final static byte[] EMPTY_VALUE = new byte[] {};
182
183  // Safe delimiter in a string
184  private final static String NULL = "\u0000";
185
186  public BackupSystemTable(Connection conn) throws IOException {
187    this.connection = conn;
188    Configuration conf = this.connection.getConfiguration();
189    tableName = BackupSystemTable.getTableName(conf);
190    bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
191    checkSystemTable();
192  }
193
194  private void checkSystemTable() throws IOException {
195    try (Admin admin = connection.getAdmin()) {
196      verifyNamespaceExists(admin);
197      Configuration conf = connection.getConfiguration();
198      if (!admin.tableExists(tableName)) {
199        TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
200        admin.createTable(backupHTD);
201      }
202      if (!admin.tableExists(bulkLoadTableName)) {
203        TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
204        admin.createTable(blHTD);
205      }
206      waitForSystemTable(admin, tableName);
207      waitForSystemTable(admin, bulkLoadTableName);
208    }
209  }
210
211  private void verifyNamespaceExists(Admin admin) throws IOException {
212    String namespaceName = tableName.getNamespaceAsString();
213    NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build();
214    NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
215    boolean exists = false;
216    for (NamespaceDescriptor nsd : list) {
217      if (nsd.getName().equals(ns.getName())) {
218        exists = true;
219        break;
220      }
221    }
222    if (!exists) {
223      admin.createNamespace(ns);
224    }
225  }
226
227  private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
228    // Return fast if the table is available and avoid a log message
229    if (admin.tableExists(tableName) && admin.isTableAvailable(tableName)) {
230      return;
231    }
232    long TIMEOUT = 60000;
233    long startTime = EnvironmentEdgeManager.currentTime();
234    LOG.debug("Backup table {} is not present and available, waiting for it to become so",
235      tableName);
236    while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
237      try {
238        Thread.sleep(100);
239      } catch (InterruptedException e) {
240      }
241      if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
242        throw new IOException(
243          "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
244      }
245    }
246    LOG.debug("Backup table {} exists and available", tableName);
247  }
248
249  @Override
250  public void close() {
251    // do nothing
252  }
253
254  /**
255   * Updates status (state) of a backup session in backup system table table
256   * @param info backup info
257   * @throws IOException exception
258   */
259  public void updateBackupInfo(BackupInfo info) throws IOException {
260    if (LOG.isTraceEnabled()) {
261      LOG.trace("update backup status in backup system table for: " + info.getBackupId()
262        + " set status=" + info.getState());
263    }
264    try (Table table = connection.getTable(tableName)) {
265      Put put = createPutForBackupInfo(info);
266      table.put(put);
267    }
268  }
269
270  /*
271   * @param backupId the backup Id
272   * @return Map of rows to path of bulk loaded hfile
273   */
274  Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
275    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
276    try (Table table = connection.getTable(bulkLoadTableName);
277      ResultScanner scanner = table.getScanner(scan)) {
278      Result res = null;
279      Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
280      while ((res = scanner.next()) != null) {
281        res.advance();
282        byte[] row = CellUtil.cloneRow(res.listCells().get(0));
283        for (Cell cell : res.listCells()) {
284          if (
285            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
286              BackupSystemTable.PATH_COL.length) == 0
287          ) {
288            map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
289          }
290        }
291      }
292      return map;
293    }
294  }
295
296  /*
297   * Used during restore
298   * @param backupId the backup Id
299   * @param sTableList List of tables
300   * @return array of Map of family to List of Paths
301   */
302  public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
303    throws IOException {
304    Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
305    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
306    try (Table table = connection.getTable(bulkLoadTableName);
307      ResultScanner scanner = table.getScanner(scan)) {
308      Result res = null;
309      while ((res = scanner.next()) != null) {
310        res.advance();
311        TableName tbl = null;
312        byte[] fam = null;
313        String path = null;
314        for (Cell cell : res.listCells()) {
315          if (
316            CellUtil.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
317              BackupSystemTable.TBL_COL.length) == 0
318          ) {
319            tbl = TableName.valueOf(CellUtil.cloneValue(cell));
320          } else if (
321            CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
322              BackupSystemTable.FAM_COL.length) == 0
323          ) {
324            fam = CellUtil.cloneValue(cell);
325          } else if (
326            CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
327              BackupSystemTable.PATH_COL.length) == 0
328          ) {
329            path = Bytes.toString(CellUtil.cloneValue(cell));
330          }
331        }
332        int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
333        if (srcIdx == -1) {
334          // the table is not among the query
335          continue;
336        }
337        if (mapForSrc[srcIdx] == null) {
338          mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
339        }
340        List<Path> files;
341        if (!mapForSrc[srcIdx].containsKey(fam)) {
342          files = new ArrayList<Path>();
343          mapForSrc[srcIdx].put(fam, files);
344        } else {
345          files = mapForSrc[srcIdx].get(fam);
346        }
347        files.add(new Path(path));
348        if (LOG.isDebugEnabled()) {
349          LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
350        }
351      }
352
353      return mapForSrc;
354    }
355  }
356
357  /**
358   * Deletes backup status from backup system table table
359   * @param backupId backup id
360   * @throws IOException exception
361   */
362  public void deleteBackupInfo(String backupId) throws IOException {
363    if (LOG.isTraceEnabled()) {
364      LOG.trace("delete backup status in backup system table for " + backupId);
365    }
366    try (Table table = connection.getTable(tableName)) {
367      Delete del = createDeleteForBackupInfo(backupId);
368      table.delete(del);
369    }
370  }
371
372  /*
373   * For postBulkLoadHFile() hook.
374   * @param tabName table name
375   * @param region the region receiving hfile
376   * @param finalPaths family and associated hfiles
377   */
378  public void writePathsPostBulkLoad(TableName tabName, byte[] region,
379    Map<byte[], List<Path>> finalPaths) throws IOException {
380    if (LOG.isDebugEnabled()) {
381      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
382        + " entries");
383    }
384    try (Table table = connection.getTable(bulkLoadTableName)) {
385      List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
386      table.put(puts);
387      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
388    }
389  }
390
391  /*
392   * For preCommitStoreFile() hook
393   * @param tabName table name
394   * @param region the region receiving hfile
395   * @param family column family
396   * @param pairs list of paths for hfiles
397   */
398  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
399    final List<Pair<Path, Path>> pairs) throws IOException {
400    if (LOG.isDebugEnabled()) {
401      LOG.debug(
402        "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
403    }
404    try (Table table = connection.getTable(bulkLoadTableName)) {
405      List<Put> puts =
406        BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
407      table.put(puts);
408      LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
409    }
410  }
411
412  /*
413   * Removes rows recording bulk loaded hfiles from backup table
414   * @param lst list of table names
415   * @param rows the rows to be deleted
416   */
417  public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
418    try (Table table = connection.getTable(bulkLoadTableName)) {
419      List<Delete> lstDels = new ArrayList<>();
420      for (byte[] row : rows) {
421        Delete del = new Delete(row);
422        lstDels.add(del);
423        LOG.debug("orig deleting the row: " + Bytes.toString(row));
424      }
425      table.delete(lstDels);
426      LOG.debug("deleted " + rows.size() + " original bulkload rows");
427    }
428  }
429
430  /*
431   * Reads the rows from backup table recording bulk loaded hfiles
432   * @param tableList list of table names
433   * @return The keys of the Map are table, region and column family. Value of the map reflects
434   * whether the hfile was recorded by preCommitStoreFile hook (true)
435   */
436  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
437    readBulkloadRows(List<TableName> tableList) throws IOException {
438
439    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
440    List<byte[]> rows = new ArrayList<>();
441    for (TableName tTable : tableList) {
442      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
443      Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
444      try (Table table = connection.getTable(bulkLoadTableName);
445        ResultScanner scanner = table.getScanner(scan)) {
446        Result res = null;
447        while ((res = scanner.next()) != null) {
448          res.advance();
449          String fam = null;
450          String path = null;
451          boolean raw = false;
452          byte[] row;
453          String region = null;
454          for (Cell cell : res.listCells()) {
455            row = CellUtil.cloneRow(cell);
456            rows.add(row);
457            String rowStr = Bytes.toString(row);
458            region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
459            if (
460              CellUtil.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
461                BackupSystemTable.FAM_COL.length) == 0
462            ) {
463              fam = Bytes.toString(CellUtil.cloneValue(cell));
464            } else if (
465              CellUtil.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
466                BackupSystemTable.PATH_COL.length) == 0
467            ) {
468              path = Bytes.toString(CellUtil.cloneValue(cell));
469            } else if (
470              CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
471                BackupSystemTable.STATE_COL.length) == 0
472            ) {
473              byte[] state = CellUtil.cloneValue(cell);
474              if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
475                raw = true;
476              } else {
477                raw = false;
478              }
479            }
480          }
481          if (map.get(tTable) == null) {
482            map.put(tTable, new HashMap<>());
483            tblMap = map.get(tTable);
484          }
485          if (tblMap.get(region) == null) {
486            tblMap.put(region, new HashMap<>());
487          }
488          Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
489          if (famMap.get(fam) == null) {
490            famMap.put(fam, new ArrayList<>());
491          }
492          famMap.get(fam).add(new Pair<>(path, raw));
493          LOG.debug("found orig " + path + " for " + fam + " of table " + region);
494        }
495      }
496    }
497    return new Pair<>(map, rows);
498  }
499
500  /*
501   * @param sTableList List of tables
502   * @param maps array of Map of family to List of Paths
503   * @param backupId the backup Id
504   */
505  public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
506    String backupId) throws IOException {
507    try (Table table = connection.getTable(bulkLoadTableName)) {
508      long ts = EnvironmentEdgeManager.currentTime();
509      int cnt = 0;
510      List<Put> puts = new ArrayList<>();
511      for (int idx = 0; idx < maps.length; idx++) {
512        Map<byte[], List<Path>> map = maps[idx];
513        TableName tn = sTableList.get(idx);
514
515        if (map == null) {
516          continue;
517        }
518
519        for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
520          byte[] fam = entry.getKey();
521          List<Path> paths = entry.getValue();
522          for (Path p : paths) {
523            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId,
524              ts, cnt++);
525            puts.add(put);
526          }
527        }
528      }
529      if (!puts.isEmpty()) {
530        table.put(puts);
531      }
532    }
533  }
534
535  /**
536   * Reads backup status object (instance of backup info) from backup system table table
537   * @param backupId backup id
538   * @return Current status of backup session or null
539   */
540  public BackupInfo readBackupInfo(String backupId) throws IOException {
541    if (LOG.isTraceEnabled()) {
542      LOG.trace("read backup status from backup system table for: " + backupId);
543    }
544
545    try (Table table = connection.getTable(tableName)) {
546      Get get = createGetForBackupInfo(backupId);
547      Result res = table.get(get);
548      if (res.isEmpty()) {
549        return null;
550      }
551      return resultToBackupInfo(res);
552    }
553  }
554
555  /**
556   * Read the last backup start code (timestamp) of last successful backup. Will return null if
557   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
558   * there is no successful backup completed so far.
559   * @param backupRoot directory path to backup destination
560   * @return the timestamp of last successful backup
561   * @throws IOException exception
562   */
563  public String readBackupStartCode(String backupRoot) throws IOException {
564    LOG.trace("read backup start code from backup system table");
565
566    try (Table table = connection.getTable(tableName)) {
567      Get get = createGetForStartCode(backupRoot);
568      Result res = table.get(get);
569      if (res.isEmpty()) {
570        return null;
571      }
572      Cell cell = res.listCells().get(0);
573      byte[] val = CellUtil.cloneValue(cell);
574      if (val.length == 0) {
575        return null;
576      }
577      return new String(val);
578    }
579  }
580
581  /**
582   * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
583   * @param startCode  start code
584   * @param backupRoot root directory path to backup
585   * @throws IOException exception
586   */
587  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
588    if (LOG.isTraceEnabled()) {
589      LOG.trace("write backup start code to backup system table " + startCode);
590    }
591    try (Table table = connection.getTable(tableName)) {
592      Put put = createPutForStartCode(startCode.toString(), backupRoot);
593      table.put(put);
594    }
595  }
596
597  /**
598   * Exclusive operations are: create, delete, merge
599   * @throws IOException if a table operation fails or an active backup exclusive operation is
600   *                     already underway
601   */
602  public void startBackupExclusiveOperation() throws IOException {
603    LOG.debug("Start new backup exclusive operation");
604
605    try (Table table = connection.getTable(tableName)) {
606      Put put = createPutForStartBackupSession();
607      // First try to put if row does not exist
608      if (
609        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
610          .ifNotExists().thenPut(put)
611      ) {
612        // Row exists, try to put if value == ACTIVE_SESSION_NO
613        if (
614          !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
615            .ifEquals(ACTIVE_SESSION_NO).thenPut(put)
616        ) {
617          throw new ExclusiveOperationException();
618        }
619      }
620    }
621  }
622
623  private Put createPutForStartBackupSession() {
624    Put put = new Put(ACTIVE_SESSION_ROW);
625    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
626    return put;
627  }
628
629  public void finishBackupExclusiveOperation() throws IOException {
630    LOG.debug("Finish backup exclusive operation");
631
632    try (Table table = connection.getTable(tableName)) {
633      Put put = createPutForStopBackupSession();
634      if (
635        !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
636          .ifEquals(ACTIVE_SESSION_YES).thenPut(put)
637      ) {
638        throw new IOException("There is no active backup exclusive operation");
639      }
640    }
641  }
642
643  private Put createPutForStopBackupSession() {
644    Put put = new Put(ACTIVE_SESSION_ROW);
645    put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
646    return put;
647  }
648
649  /**
650   * Get the Region Servers log information after the last log roll from backup system table.
651   * @param backupRoot root directory path to backup
652   * @return RS log info
653   * @throws IOException exception
654   */
655  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
656    throws IOException {
657    LOG.trace("read region server last roll log result to backup system table");
658
659    Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
660
661    try (Table table = connection.getTable(tableName);
662      ResultScanner scanner = table.getScanner(scan)) {
663      Result res;
664      HashMap<String, Long> rsTimestampMap = new HashMap<>();
665      while ((res = scanner.next()) != null) {
666        res.advance();
667        Cell cell = res.current();
668        byte[] row = CellUtil.cloneRow(cell);
669        String server = getServerNameForReadRegionServerLastLogRollResult(row);
670        byte[] data = CellUtil.cloneValue(cell);
671        rsTimestampMap.put(server, Bytes.toLong(data));
672      }
673      return rsTimestampMap;
674    }
675  }
676
677  /**
678   * Writes Region Server last roll log result (timestamp) to backup system table table
679   * @param server     Region Server name
680   * @param ts         last log timestamp
681   * @param backupRoot root directory path to backup
682   * @throws IOException exception
683   */
684  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
685    throws IOException {
686    LOG.trace("write region server last roll log result to backup system table");
687
688    try (Table table = connection.getTable(tableName)) {
689      Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
690      table.put(put);
691    }
692  }
693
694  /**
695   * Get all completed backup information (in desc order by time)
696   * @param onlyCompleted true, if only successfully completed sessions
697   * @return history info of BackupCompleteData
698   * @throws IOException exception
699   */
700  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
701    LOG.trace("get backup history from backup system table");
702
703    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
704    ArrayList<BackupInfo> list = getBackupInfos(state);
705    return BackupUtils.sortHistoryListDesc(list);
706  }
707
708  /**
709   * Get all backups history
710   * @return list of backup info
711   * @throws IOException if getting the backup history fails
712   */
713  public List<BackupInfo> getBackupHistory() throws IOException {
714    return getBackupHistory(false);
715  }
716
717  /**
718   * Get first n backup history records
719   * @param n number of records, if n== -1 - max number is ignored
720   * @return list of records
721   * @throws IOException if getting the backup history fails
722   */
723  public List<BackupInfo> getHistory(int n) throws IOException {
724    List<BackupInfo> history = getBackupHistory();
725    if (n == -1 || history.size() <= n) {
726      return history;
727    }
728    return Collections.unmodifiableList(history.subList(0, n));
729  }
730
731  /**
732   * Get backup history records filtered by list of filters.
733   * @param n       max number of records, if n == -1 , then max number is ignored
734   * @param filters list of filters
735   * @return backup records
736   * @throws IOException if getting the backup history fails
737   */
738  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
739    if (filters.length == 0) {
740      return getHistory(n);
741    }
742
743    List<BackupInfo> history = getBackupHistory();
744    List<BackupInfo> result = new ArrayList<>();
745    for (BackupInfo bi : history) {
746      if (n >= 0 && result.size() == n) {
747        break;
748      }
749
750      boolean passed = true;
751      for (int i = 0; i < filters.length; i++) {
752        if (!filters[i].apply(bi)) {
753          passed = false;
754          break;
755        }
756      }
757      if (passed) {
758        result.add(bi);
759      }
760    }
761    return result;
762  }
763
764  /*
765   * Retrieve TableName's for completed backup of given type
766   * @param type backup type
767   * @return List of table names
768   */
769  public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
770    Set<TableName> names = new HashSet<>();
771    List<BackupInfo> infos = getBackupHistory(true);
772    for (BackupInfo info : infos) {
773      if (info.getType() == type) {
774        names.addAll(info.getTableNames());
775      }
776    }
777    return new ArrayList<>(names);
778  }
779
780  /**
781   * Get history for backup destination
782   * @param backupRoot backup destination path
783   * @return List of backup info
784   * @throws IOException if getting the backup history fails
785   */
786  public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
787    ArrayList<BackupInfo> history = getBackupHistory(false);
788    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
789      BackupInfo info = iterator.next();
790      if (!backupRoot.equals(info.getBackupRootDir())) {
791        iterator.remove();
792      }
793    }
794    return history;
795  }
796
797  /**
798   * Get history for a table
799   * @param name table name
800   * @return history for a table
801   * @throws IOException if getting the backup history fails
802   */
803  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
804    List<BackupInfo> history = getBackupHistory();
805    List<BackupInfo> tableHistory = new ArrayList<>();
806    for (BackupInfo info : history) {
807      List<TableName> tables = info.getTableNames();
808      if (tables.contains(name)) {
809        tableHistory.add(info);
810      }
811    }
812    return tableHistory;
813  }
814
815  public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set,
816    String backupRoot) throws IOException {
817    List<BackupInfo> history = getBackupHistory(backupRoot);
818    Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = new HashMap<>();
819    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
820      BackupInfo info = iterator.next();
821      if (!backupRoot.equals(info.getBackupRootDir())) {
822        continue;
823      }
824      List<TableName> tables = info.getTableNames();
825      for (TableName tableName : tables) {
826        if (set.contains(tableName)) {
827          ArrayList<BackupInfo> list = tableHistoryMap.get(tableName);
828          if (list == null) {
829            list = new ArrayList<>();
830            tableHistoryMap.put(tableName, list);
831          }
832          list.add(info);
833        }
834      }
835    }
836    return tableHistoryMap;
837  }
838
839  /**
840   * Get all backup sessions with a given state (in descending order by time)
841   * @param state backup session state
842   * @return history info of backup info objects
843   * @throws IOException exception
844   */
845  public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
846    LOG.trace("get backup infos from backup system table");
847
848    Scan scan = createScanForBackupHistory();
849    ArrayList<BackupInfo> list = new ArrayList<>();
850
851    try (Table table = connection.getTable(tableName);
852      ResultScanner scanner = table.getScanner(scan)) {
853      Result res;
854      while ((res = scanner.next()) != null) {
855        res.advance();
856        BackupInfo context = cellToBackupInfo(res.current());
857        if (state != BackupState.ANY && context.getState() != state) {
858          continue;
859        }
860        list.add(context);
861      }
862      return list;
863    }
864  }
865
866  /**
867   * Write the current timestamps for each regionserver to backup system table after a successful
868   * full or incremental backup. The saved timestamp is of the last log file that was backed up
869   * already.
870   * @param tables        tables
871   * @param newTimestamps timestamps
872   * @param backupRoot    root directory path to backup
873   * @throws IOException exception
874   */
875  public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Long> newTimestamps,
876    String backupRoot) throws IOException {
877    if (LOG.isTraceEnabled()) {
878      LOG.trace("write RS log time stamps to backup system table for tables ["
879        + StringUtils.join(tables, ",") + "]");
880    }
881    List<Put> puts = new ArrayList<>();
882    for (TableName table : tables) {
883      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
884      Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot);
885      puts.add(put);
886    }
887    try (Table table = connection.getTable(tableName)) {
888      table.put(puts);
889    }
890  }
891
892  /**
893   * Read the timestamp for each region server log after the last successful backup. Each table has
894   * its own set of the timestamps. The info is stored for each table as a concatenated string of
895   * rs->timestapmp
896   * @param backupRoot root directory path to backup
897   * @return the timestamp for each region server. key: tableName value:
898   *         RegionServer,PreviousTimeStamp
899   * @throws IOException exception
900   */
901  public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
902    throws IOException {
903    if (LOG.isTraceEnabled()) {
904      LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
905    }
906
907    Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
908
909    Scan scan = createScanForReadLogTimestampMap(backupRoot);
910    try (Table table = connection.getTable(tableName);
911      ResultScanner scanner = table.getScanner(scan)) {
912      Result res;
913      while ((res = scanner.next()) != null) {
914        res.advance();
915        Cell cell = res.current();
916        byte[] row = CellUtil.cloneRow(cell);
917        String tabName = getTableNameForReadLogTimestampMap(row);
918        TableName tn = TableName.valueOf(tabName);
919        byte[] data = CellUtil.cloneValue(cell);
920        if (data == null) {
921          throw new IOException("Data of last backup data from backup system table "
922            + "is empty. Create a backup first.");
923        }
924        if (data != null && data.length > 0) {
925          HashMap<String, Long> lastBackup =
926            fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
927          tableTimestampMap.put(tn, lastBackup);
928        }
929      }
930      return tableTimestampMap;
931    }
932  }
933
934  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
935    Map<String, Long> map) {
936    BackupProtos.TableServerTimestamp.Builder tstBuilder =
937      BackupProtos.TableServerTimestamp.newBuilder();
938    tstBuilder
939      .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
940
941    for (Entry<String, Long> entry : map.entrySet()) {
942      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
943      HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder();
944      ServerName sn = ServerName.parseServerName(entry.getKey());
945      snBuilder.setHostName(sn.getHostname());
946      snBuilder.setPort(sn.getPort());
947      builder.setServerName(snBuilder.build());
948      builder.setTimestamp(entry.getValue());
949      tstBuilder.addServerTimestamp(builder.build());
950    }
951
952    return tstBuilder.build();
953  }
954
955  private HashMap<String, Long>
956    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
957
958    HashMap<String, Long> map = new HashMap<>();
959    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
960    for (BackupProtos.ServerTimestamp st : list) {
961      ServerName sn =
962        org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName());
963      map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp());
964    }
965    return map;
966  }
967
968  /**
969   * Return the current tables covered by incremental backup.
970   * @param backupRoot root directory path to backup
971   * @return set of tableNames
972   * @throws IOException exception
973   */
974  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
975    LOG.trace("get incremental backup table set from backup system table");
976
977    TreeSet<TableName> set = new TreeSet<>();
978
979    try (Table table = connection.getTable(tableName)) {
980      Get get = createGetForIncrBackupTableSet(backupRoot);
981      Result res = table.get(get);
982      if (res.isEmpty()) {
983        return set;
984      }
985      List<Cell> cells = res.listCells();
986      for (Cell cell : cells) {
987        // qualifier = table name - we use table names as qualifiers
988        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
989      }
990      return set;
991    }
992  }
993
994  /**
995   * Add tables to global incremental backup set
996   * @param tables     set of tables
997   * @param backupRoot root directory path to backup
998   * @throws IOException exception
999   */
1000  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
1001    throws IOException {
1002    if (LOG.isTraceEnabled()) {
1003      LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
1004        + " tables [" + StringUtils.join(tables, " ") + "]");
1005    }
1006    if (LOG.isDebugEnabled()) {
1007      tables.forEach(table -> LOG.debug(Objects.toString(table)));
1008    }
1009    try (Table table = connection.getTable(tableName)) {
1010      Put put = createPutForIncrBackupTableSet(tables, backupRoot);
1011      table.put(put);
1012    }
1013  }
1014
1015  /**
1016   * Deletes incremental backup set for a backup destination
1017   * @param backupRoot backup root
1018   */
1019  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
1020    if (LOG.isTraceEnabled()) {
1021      LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot);
1022    }
1023    try (Table table = connection.getTable(tableName)) {
1024      Delete delete = createDeleteForIncrBackupTableSet(backupRoot);
1025      table.delete(delete);
1026    }
1027  }
1028
1029  /**
1030   * Checks if we have at least one backup session in backup system table This API is used by
1031   * BackupLogCleaner
1032   * @return true, if - at least one session exists in backup system table table
1033   * @throws IOException exception
1034   */
1035  public boolean hasBackupSessions() throws IOException {
1036    LOG.trace("Has backup sessions from backup system table");
1037
1038    boolean result = false;
1039    Scan scan = createScanForBackupHistory();
1040    scan.setCaching(1);
1041    try (Table table = connection.getTable(tableName);
1042      ResultScanner scanner = table.getScanner(scan)) {
1043      if (scanner.next() != null) {
1044        result = true;
1045      }
1046      return result;
1047    }
1048  }
1049
1050  /**
1051   * BACKUP SETS
1052   */
1053
1054  /**
1055   * Get backup set list
1056   * @return backup set list
1057   * @throws IOException if a table or scanner operation fails
1058   */
1059  public List<String> listBackupSets() throws IOException {
1060    LOG.trace("Backup set list");
1061
1062    List<String> list = new ArrayList<>();
1063    try (Table table = connection.getTable(tableName)) {
1064      Scan scan = createScanForBackupSetList();
1065      scan.readVersions(1);
1066      try (ResultScanner scanner = table.getScanner(scan)) {
1067        Result res;
1068        while ((res = scanner.next()) != null) {
1069          res.advance();
1070          list.add(cellKeyToBackupSetName(res.current()));
1071        }
1072        return list;
1073      }
1074    }
1075  }
1076
1077  /**
1078   * Get backup set description (list of tables)
1079   * @param name set's name
1080   * @return list of tables in a backup set
1081   * @throws IOException if a table operation fails
1082   */
1083  public List<TableName> describeBackupSet(String name) throws IOException {
1084    if (LOG.isTraceEnabled()) {
1085      LOG.trace(" Backup set describe: " + name);
1086    }
1087    try (Table table = connection.getTable(tableName)) {
1088      Get get = createGetForBackupSet(name);
1089      Result res = table.get(get);
1090      if (res.isEmpty()) {
1091        return null;
1092      }
1093      res.advance();
1094      String[] tables = cellValueToBackupSet(res.current());
1095      return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
1096        .collect(Collectors.toList());
1097    }
1098  }
1099
1100  /**
1101   * Add backup set (list of tables)
1102   * @param name      set name
1103   * @param newTables list of tables, comma-separated
1104   * @throws IOException if a table operation fails
1105   */
1106  public void addToBackupSet(String name, String[] newTables) throws IOException {
1107    if (LOG.isTraceEnabled()) {
1108      LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
1109    }
1110    String[] union = null;
1111    try (Table table = connection.getTable(tableName)) {
1112      Get get = createGetForBackupSet(name);
1113      Result res = table.get(get);
1114      if (res.isEmpty()) {
1115        union = newTables;
1116      } else {
1117        res.advance();
1118        String[] tables = cellValueToBackupSet(res.current());
1119        union = merge(tables, newTables);
1120      }
1121      Put put = createPutForBackupSet(name, union);
1122      table.put(put);
1123    }
1124  }
1125
1126  /**
1127   * Remove tables from backup set (list of tables)
1128   * @param name     set name
1129   * @param toRemove list of tables
1130   * @throws IOException if a table operation or deleting the backup set fails
1131   */
1132  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
1133    if (LOG.isTraceEnabled()) {
1134      LOG.trace(
1135        " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
1136    }
1137    String[] disjoint;
1138    String[] tables;
1139    try (Table table = connection.getTable(tableName)) {
1140      Get get = createGetForBackupSet(name);
1141      Result res = table.get(get);
1142      if (res.isEmpty()) {
1143        LOG.warn("Backup set '" + name + "' not found.");
1144        return;
1145      } else {
1146        res.advance();
1147        tables = cellValueToBackupSet(res.current());
1148        disjoint = disjoin(tables, toRemove);
1149      }
1150      if (disjoint.length > 0 && disjoint.length != tables.length) {
1151        Put put = createPutForBackupSet(name, disjoint);
1152        table.put(put);
1153      } else if (disjoint.length == tables.length) {
1154        LOG.warn("Backup set '" + name + "' does not contain tables ["
1155          + StringUtils.join(toRemove, " ") + "]");
1156      } else { // disjoint.length == 0 and tables.length >0
1157        // Delete backup set
1158        LOG.info("Backup set '" + name + "' is empty. Deleting.");
1159        deleteBackupSet(name);
1160      }
1161    }
1162  }
1163
1164  private String[] merge(String[] existingTables, String[] newTables) {
1165    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1166    tables.addAll(Arrays.asList(newTables));
1167    return tables.toArray(new String[0]);
1168  }
1169
1170  private String[] disjoin(String[] existingTables, String[] toRemove) {
1171    Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
1172    Arrays.asList(toRemove).forEach(table -> tables.remove(table));
1173    return tables.toArray(new String[0]);
1174  }
1175
1176  /**
1177   * Delete backup set
1178   * @param name set's name
1179   * @throws IOException if getting or deleting the table fails
1180   */
1181  public void deleteBackupSet(String name) throws IOException {
1182    if (LOG.isTraceEnabled()) {
1183      LOG.trace(" Backup set delete: " + name);
1184    }
1185    try (Table table = connection.getTable(tableName)) {
1186      Delete del = createDeleteForBackupSet(name);
1187      table.delete(del);
1188    }
1189  }
1190
1191  /**
1192   * Get backup system table descriptor
1193   * @return table's descriptor
1194   */
1195  public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
1196    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
1197
1198    ColumnFamilyDescriptorBuilder colBuilder =
1199      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1200
1201    colBuilder.setMaxVersions(1);
1202    Configuration config = HBaseConfiguration.create();
1203    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1204      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1205    colBuilder.setTimeToLive(ttl);
1206
1207    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1208    builder.setColumnFamily(colSessionsDesc);
1209
1210    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1211    colBuilder.setTimeToLive(ttl);
1212    builder.setColumnFamily(colBuilder.build());
1213    return builder.build();
1214  }
1215
1216  public static TableName getTableName(Configuration conf) {
1217    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1218      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
1219    return TableName.valueOf(name);
1220  }
1221
1222  public static String getTableNameAsString(Configuration conf) {
1223    return getTableName(conf).getNameAsString();
1224  }
1225
1226  public static String getSnapshotName(Configuration conf) {
1227    return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
1228  }
1229
1230  /**
1231   * Get backup system table descriptor
1232   * @return table's descriptor
1233   */
1234  public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
1235    TableDescriptorBuilder builder =
1236      TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
1237
1238    ColumnFamilyDescriptorBuilder colBuilder =
1239      ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
1240    colBuilder.setMaxVersions(1);
1241    Configuration config = HBaseConfiguration.create();
1242    int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
1243      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1244    colBuilder.setTimeToLive(ttl);
1245    ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
1246    builder.setColumnFamily(colSessionsDesc);
1247    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
1248    colBuilder.setTimeToLive(ttl);
1249    builder.setColumnFamily(colBuilder.build());
1250    return builder.build();
1251  }
1252
1253  public static TableName getTableNameForBulkLoadedData(Configuration conf) {
1254    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
1255      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
1256    return TableName.valueOf(name);
1257  }
1258
1259  /**
1260   * Creates Put operation for a given backup info object
1261   * @param context backup info
1262   * @return put operation
1263   * @throws IOException exception
1264   */
1265  private Put createPutForBackupInfo(BackupInfo context) throws IOException {
1266    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
1267    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"),
1268      context.toByteArray());
1269    return put;
1270  }
1271
1272  /**
1273   * Creates Get operation for a given backup id
1274   * @param backupId backup's ID
1275   * @return get operation
1276   * @throws IOException exception
1277   */
1278  private Get createGetForBackupInfo(String backupId) throws IOException {
1279    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
1280    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1281    get.readVersions(1);
1282    return get;
1283  }
1284
1285  /**
1286   * Creates Delete operation for a given backup id
1287   * @param backupId backup's ID
1288   * @return delete operation
1289   */
1290  private Delete createDeleteForBackupInfo(String backupId) {
1291    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
1292    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1293    return del;
1294  }
1295
1296  /**
1297   * Converts Result to BackupInfo
1298   * @param res HBase result
1299   * @return backup info instance
1300   * @throws IOException exception
1301   */
1302  private BackupInfo resultToBackupInfo(Result res) throws IOException {
1303    res.advance();
1304    Cell cell = res.current();
1305    return cellToBackupInfo(cell);
1306  }
1307
1308  /**
1309   * Creates Get operation to retrieve start code from backup system table
1310   * @return get operation
1311   * @throws IOException exception
1312   */
1313  private Get createGetForStartCode(String rootPath) throws IOException {
1314    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
1315    get.addFamily(BackupSystemTable.META_FAMILY);
1316    get.readVersions(1);
1317    return get;
1318  }
1319
1320  /**
1321   * Creates Put operation to store start code to backup system table
1322   * @return put operation
1323   */
1324  private Put createPutForStartCode(String startCode, String rootPath) {
1325    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
1326    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
1327      Bytes.toBytes(startCode));
1328    return put;
1329  }
1330
1331  /**
1332   * Creates Get to retrieve incremental backup table set from backup system table
1333   * @return get operation
1334   * @throws IOException exception
1335   */
1336  private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
1337    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
1338    get.addFamily(BackupSystemTable.META_FAMILY);
1339    get.readVersions(1);
1340    return get;
1341  }
1342
1343  /**
1344   * Creates Put to store incremental backup table set
1345   * @param tables tables
1346   * @return put operation
1347   */
1348  private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
1349    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
1350    for (TableName table : tables) {
1351      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
1352        EMPTY_VALUE);
1353    }
1354    return put;
1355  }
1356
1357  /**
1358   * Creates Delete for incremental backup table set
1359   * @param backupRoot backup root
1360   * @return delete operation
1361   */
1362  private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
1363    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
1364    delete.addFamily(BackupSystemTable.META_FAMILY);
1365    return delete;
1366  }
1367
1368  /**
1369   * Creates Scan operation to load backup history
1370   * @return scan operation
1371   */
1372  private Scan createScanForBackupHistory() {
1373    Scan scan = new Scan();
1374    byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX);
1375    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1376    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1377    scan.withStartRow(startRow);
1378    scan.withStopRow(stopRow);
1379    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
1380    scan.readVersions(1);
1381    return scan;
1382  }
1383
1384  /**
1385   * Converts cell to backup info instance.
1386   * @param current current cell
1387   * @return backup backup info instance
1388   * @throws IOException exception
1389   */
1390  private BackupInfo cellToBackupInfo(Cell current) throws IOException {
1391    byte[] data = CellUtil.cloneValue(current);
1392    return BackupInfo.fromByteArray(data);
1393  }
1394
1395  /**
1396   * Creates Put to write RS last roll log timestamp map
1397   * @param table table
1398   * @param smap  map, containing RS:ts
1399   * @return put operation
1400   */
1401  private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
1402    String backupRoot) {
1403    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
1404    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap);
1405    return put;
1406  }
1407
1408  /**
1409   * Creates Scan to load table-> { RS -> ts} map of maps
1410   * @return scan operation
1411   */
1412  private Scan createScanForReadLogTimestampMap(String backupRoot) {
1413    Scan scan = new Scan();
1414    byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot);
1415    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1416    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1417    scan.withStartRow(startRow);
1418    scan.withStopRow(stopRow);
1419    scan.addFamily(BackupSystemTable.META_FAMILY);
1420
1421    return scan;
1422  }
1423
1424  /**
1425   * Get table name from rowkey
1426   * @param cloneRow rowkey
1427   * @return table name
1428   */
1429  private String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
1430    String s = Bytes.toString(cloneRow);
1431    int index = s.lastIndexOf(NULL);
1432    return s.substring(index + 1);
1433  }
1434
1435  /**
1436   * Creates Put to store RS last log result
1437   * @param server    server name
1438   * @param timestamp log roll result (timestamp)
1439   * @return put operation
1440   */
1441  private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
1442    String backupRoot) {
1443    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
1444    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"),
1445      Bytes.toBytes(timestamp));
1446    return put;
1447  }
1448
1449  /**
1450   * Creates Scan operation to load last RS log roll results
1451   * @return scan operation
1452   */
1453  private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
1454    Scan scan = new Scan();
1455    byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot);
1456    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1457    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1458    scan.withStartRow(startRow);
1459    scan.withStopRow(stopRow);
1460    scan.addFamily(BackupSystemTable.META_FAMILY);
1461    scan.readVersions(1);
1462
1463    return scan;
1464  }
1465
1466  /**
1467   * Get server's name from rowkey
1468   * @param row rowkey
1469   * @return server's name
1470   */
1471  private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
1472    String s = Bytes.toString(row);
1473    int index = s.lastIndexOf(NULL);
1474    return s.substring(index + 1);
1475  }
1476
1477  /*
1478   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1479   */
1480  static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
1481    Map<byte[], List<Path>> finalPaths) {
1482    List<Put> puts = new ArrayList<>();
1483    for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
1484      for (Path path : entry.getValue()) {
1485        String file = path.toString();
1486        int lastSlash = file.lastIndexOf("/");
1487        String filename = file.substring(lastSlash + 1);
1488        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1489          Bytes.toString(region), BLK_LD_DELIM, filename));
1490        put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1491        put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
1492        put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1493        put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
1494        puts.add(put);
1495        LOG
1496          .debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
1497      }
1498    }
1499    return puts;
1500  }
1501
1502  public static void snapshot(Connection conn) throws IOException {
1503    try (Admin admin = conn.getAdmin()) {
1504      Configuration conf = conn.getConfiguration();
1505      admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
1506    }
1507  }
1508
1509  public static void restoreFromSnapshot(Connection conn) throws IOException {
1510    Configuration conf = conn.getConfiguration();
1511    LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
1512    try (Admin admin = conn.getAdmin()) {
1513      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1514      if (snapshotExists(admin, snapshotName)) {
1515        admin.disableTable(BackupSystemTable.getTableName(conf));
1516        admin.restoreSnapshot(snapshotName);
1517        admin.enableTable(BackupSystemTable.getTableName(conf));
1518        LOG.debug("Done restoring backup system table");
1519      } else {
1520        // Snapshot does not exists, i.e completeBackup failed after
1521        // deleting backup system table snapshot
1522        // In this case we log WARN and proceed
1523        LOG.warn(
1524          "Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
1525      }
1526    }
1527  }
1528
1529  private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
1530    List<SnapshotDescription> list = admin.listSnapshots();
1531    for (SnapshotDescription desc : list) {
1532      if (desc.getName().equals(snapshotName)) {
1533        return true;
1534      }
1535    }
1536    return false;
1537  }
1538
1539  public static boolean snapshotExists(Connection conn) throws IOException {
1540    return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
1541  }
1542
1543  public static void deleteSnapshot(Connection conn) throws IOException {
1544    Configuration conf = conn.getConfiguration();
1545    LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
1546    try (Admin admin = conn.getAdmin()) {
1547      String snapshotName = BackupSystemTable.getSnapshotName(conf);
1548      if (snapshotExists(admin, snapshotName)) {
1549        admin.deleteSnapshot(snapshotName);
1550        LOG.debug("Done deleting backup system table snapshot");
1551      } else {
1552        LOG.error("Snapshot " + snapshotName + " does not exists");
1553      }
1554    }
1555  }
1556
1557  /*
1558   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1559   */
1560  static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family,
1561    final List<Pair<Path, Path>> pairs) {
1562    List<Put> puts = new ArrayList<>(pairs.size());
1563    for (Pair<Path, Path> pair : pairs) {
1564      Path path = pair.getSecond();
1565      String file = path.toString();
1566      int lastSlash = file.lastIndexOf("/");
1567      String filename = file.substring(lastSlash + 1);
1568      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1569        Bytes.toString(region), BLK_LD_DELIM, filename));
1570      put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1571      put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
1572      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1573      put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
1574      puts.add(put);
1575      LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
1576    }
1577    return puts;
1578  }
1579
1580  public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
1581    List<Delete> lstDels = new ArrayList<>(lst.size());
1582    for (TableName table : lst) {
1583      Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
1584      del.addFamily(BackupSystemTable.META_FAMILY);
1585      lstDels.add(del);
1586    }
1587    return lstDels;
1588  }
1589
1590  private Put createPutForDeleteOperation(String[] backupIdList) {
1591    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1592    Put put = new Put(DELETE_OP_ROW);
1593    put.addColumn(META_FAMILY, FAM_COL, value);
1594    return put;
1595  }
1596
1597  private Delete createDeleteForBackupDeleteOperation() {
1598    Delete delete = new Delete(DELETE_OP_ROW);
1599    delete.addFamily(META_FAMILY);
1600    return delete;
1601  }
1602
1603  private Get createGetForDeleteOperation() {
1604    Get get = new Get(DELETE_OP_ROW);
1605    get.addFamily(META_FAMILY);
1606    return get;
1607  }
1608
1609  public void startDeleteOperation(String[] backupIdList) throws IOException {
1610    if (LOG.isTraceEnabled()) {
1611      LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
1612    }
1613    Put put = createPutForDeleteOperation(backupIdList);
1614    try (Table table = connection.getTable(tableName)) {
1615      table.put(put);
1616    }
1617  }
1618
1619  public void finishDeleteOperation() throws IOException {
1620    LOG.trace("Finsih delete operation for backup ids");
1621
1622    Delete delete = createDeleteForBackupDeleteOperation();
1623    try (Table table = connection.getTable(tableName)) {
1624      table.delete(delete);
1625    }
1626  }
1627
1628  public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
1629    LOG.trace("Get delete operation for backup ids");
1630
1631    Get get = createGetForDeleteOperation();
1632    try (Table table = connection.getTable(tableName)) {
1633      Result res = table.get(get);
1634      if (res.isEmpty()) {
1635        return null;
1636      }
1637      Cell cell = res.listCells().get(0);
1638      byte[] val = CellUtil.cloneValue(cell);
1639      if (val.length == 0) {
1640        return null;
1641      }
1642      return new String(val).split(",");
1643    }
1644  }
1645
1646  private Put createPutForMergeOperation(String[] backupIdList) {
1647    byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
1648    Put put = new Put(MERGE_OP_ROW);
1649    put.addColumn(META_FAMILY, FAM_COL, value);
1650    return put;
1651  }
1652
1653  public boolean isMergeInProgress() throws IOException {
1654    Get get = new Get(MERGE_OP_ROW);
1655    try (Table table = connection.getTable(tableName)) {
1656      Result res = table.get(get);
1657      return (!res.isEmpty());
1658    }
1659  }
1660
1661  private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
1662    byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
1663    Put put = new Put(MERGE_OP_ROW);
1664    put.addColumn(META_FAMILY, PATH_COL, value);
1665    return put;
1666  }
1667
1668  private Delete createDeleteForBackupMergeOperation() {
1669    Delete delete = new Delete(MERGE_OP_ROW);
1670    delete.addFamily(META_FAMILY);
1671    return delete;
1672  }
1673
1674  private Get createGetForMergeOperation() {
1675    Get get = new Get(MERGE_OP_ROW);
1676    get.addFamily(META_FAMILY);
1677    return get;
1678  }
1679
1680  public void startMergeOperation(String[] backupIdList) throws IOException {
1681    if (LOG.isTraceEnabled()) {
1682      LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList));
1683    }
1684    Put put = createPutForMergeOperation(backupIdList);
1685    try (Table table = connection.getTable(tableName)) {
1686      table.put(put);
1687    }
1688  }
1689
1690  public void updateProcessedTablesForMerge(List<TableName> tables) throws IOException {
1691    if (LOG.isTraceEnabled()) {
1692      LOG.trace("Update tables for merge : " + StringUtils.join(tables, ","));
1693    }
1694    Put put = createPutForUpdateTablesForMerge(tables);
1695    try (Table table = connection.getTable(tableName)) {
1696      table.put(put);
1697    }
1698  }
1699
1700  public void finishMergeOperation() throws IOException {
1701    LOG.trace("Finish merge operation for backup ids");
1702
1703    Delete delete = createDeleteForBackupMergeOperation();
1704    try (Table table = connection.getTable(tableName)) {
1705      table.delete(delete);
1706    }
1707  }
1708
1709  public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
1710    LOG.trace("Get backup ids for merge operation");
1711
1712    Get get = createGetForMergeOperation();
1713    try (Table table = connection.getTable(tableName)) {
1714      Result res = table.get(get);
1715      if (res.isEmpty()) {
1716        return null;
1717      }
1718      Cell cell = res.listCells().get(0);
1719      byte[] val = CellUtil.cloneValue(cell);
1720      if (val.length == 0) {
1721        return null;
1722      }
1723      return new String(val).split(",");
1724    }
1725  }
1726
1727  static Scan createScanForOrigBulkLoadedFiles(TableName table) {
1728    Scan scan = new Scan();
1729    byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
1730    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1731    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1732    scan.withStartRow(startRow);
1733    scan.withStopRow(stopRow);
1734    scan.addFamily(BackupSystemTable.META_FAMILY);
1735    scan.readVersions(1);
1736    return scan;
1737  }
1738
1739  static String getTableNameFromOrigBulkLoadRow(String rowStr) {
1740    String[] parts = rowStr.split(BLK_LD_DELIM);
1741    return parts[1];
1742  }
1743
1744  static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
1745    // format is bulk : namespace : table : region : file
1746    String[] parts = rowStr.split(BLK_LD_DELIM);
1747    int idx = 3;
1748    if (parts.length == 4) {
1749      // the table is in default namespace
1750      idx = 2;
1751    }
1752    LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
1753    return parts[idx];
1754  }
1755
1756  /*
1757   * Used to query bulk loaded hfiles which have been copied by incremental backup
1758   * @param backupId the backup Id. It can be null when querying for all tables
1759   * @return the Scan object
1760   */
1761  static Scan createScanForBulkLoadedFiles(String backupId) {
1762    Scan scan = new Scan();
1763    byte[] startRow =
1764      backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
1765    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1766    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1767    scan.withStartRow(startRow);
1768    scan.withStopRow(stopRow);
1769    scan.addFamily(BackupSystemTable.META_FAMILY);
1770    scan.readVersions(1);
1771    return scan;
1772  }
1773
1774  static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
1775    long ts, int idx) {
1776    Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx));
1777    put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
1778    put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
1779    put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(p));
1780    return put;
1781  }
1782
1783  /**
1784   * Creates Scan operation to load backup set list
1785   * @return scan operation
1786   */
1787  private Scan createScanForBackupSetList() {
1788    Scan scan = new Scan();
1789    byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX);
1790    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
1791    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
1792    scan.withStartRow(startRow);
1793    scan.withStopRow(stopRow);
1794    scan.addFamily(BackupSystemTable.META_FAMILY);
1795    return scan;
1796  }
1797
1798  /**
1799   * Creates Get operation to load backup set content
1800   * @return get operation
1801   */
1802  private Get createGetForBackupSet(String name) {
1803    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
1804    get.addFamily(BackupSystemTable.META_FAMILY);
1805    return get;
1806  }
1807
1808  /**
1809   * Creates Delete operation to delete backup set content
1810   * @param name backup set's name
1811   * @return delete operation
1812   */
1813  private Delete createDeleteForBackupSet(String name) {
1814    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
1815    del.addFamily(BackupSystemTable.META_FAMILY);
1816    return del;
1817  }
1818
1819  /**
1820   * Creates Put operation to update backup set content
1821   * @param name   backup set's name
1822   * @param tables list of tables
1823   * @return put operation
1824   */
1825  private Put createPutForBackupSet(String name, String[] tables) {
1826    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
1827    byte[] value = convertToByteArray(tables);
1828    put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value);
1829    return put;
1830  }
1831
1832  private byte[] convertToByteArray(String[] tables) {
1833    return Bytes.toBytes(StringUtils.join(tables, ","));
1834  }
1835
1836  /**
1837   * Converts cell to backup set list.
1838   * @param current current cell
1839   * @return backup set as array of table names
1840   */
1841  private String[] cellValueToBackupSet(Cell current) {
1842    byte[] data = CellUtil.cloneValue(current);
1843    if (!ArrayUtils.isEmpty(data)) {
1844      return Bytes.toString(data).split(",");
1845    }
1846    return new String[0];
1847  }
1848
1849  /**
1850   * Converts cell key to backup set name.
1851   * @param current current cell
1852   * @return backup set name
1853   */
1854  private String cellKeyToBackupSetName(Cell current) {
1855    byte[] data = CellUtil.cloneRow(current);
1856    return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
1857  }
1858
1859  private static byte[] rowkey(String s, String... other) {
1860    StringBuilder sb = new StringBuilder(s);
1861    for (String ss : other) {
1862      sb.append(ss);
1863    }
1864    return Bytes.toBytes(sb.toString());
1865  }
1866}