001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Objects;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.LocatedFileStatus;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.RemoteIterator;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.NamespaceDescriptor;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
040import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
041import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
042import org.apache.hadoop.hbase.backup.impl.BackupManager;
043import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
044import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
045import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
046import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient;
047import org.apache.hadoop.hbase.backup.util.BackupUtils;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Connection;
051import org.apache.hadoop.hbase.client.ConnectionFactory;
052import org.apache.hadoop.hbase.client.Durability;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
058import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner;
059import org.apache.hadoop.hbase.regionserver.LogRoller;
060import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
061import org.apache.hadoop.hbase.security.UserProvider;
062import org.apache.hadoop.hbase.security.access.SecureTestUtil;
063import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
068import org.apache.hadoop.hbase.wal.WALFactory;
069import org.junit.AfterClass;
070import org.junit.Before;
071import org.junit.BeforeClass;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * This class is only a base for other integration-level backup tests. Do not add tests here.
077 * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
078 * tests should have their own classes and extend this one
079 */
080public class TestBackupBase {
081  private static final Logger LOG = LoggerFactory.getLogger(TestBackupBase.class);
082
083  protected static HBaseTestingUtil TEST_UTIL;
084  protected static HBaseTestingUtil TEST_UTIL2;
085  protected static Configuration conf1;
086  protected static Configuration conf2;
087
088  protected static TableName table1 = TableName.valueOf("table1");
089  protected static TableDescriptor table1Desc;
090  protected static TableName table2 = TableName.valueOf("table2");
091  protected static TableName table3 = TableName.valueOf("table3");
092  protected static TableName table4 = TableName.valueOf("table4");
093
094  protected static TableName table1_restore = TableName.valueOf("default:table1");
095  protected static TableName table2_restore = TableName.valueOf("ns2:table2");
096  protected static TableName table3_restore = TableName.valueOf("ns3:table3_restore");
097
098  protected static final int NB_ROWS_IN_BATCH = 99;
099  protected static final byte[] qualName = Bytes.toBytes("q1");
100  protected static final byte[] famName = Bytes.toBytes("f");
101
102  protected static String BACKUP_ROOT_DIR;
103  protected static String BACKUP_REMOTE_ROOT_DIR;
104  protected static String provider = "defaultProvider";
105  protected static boolean secure = false;
106
107  protected static boolean autoRestoreOnFailure;
108  protected static boolean useSecondCluster;
109
110  static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient {
111    public IncrementalTableBackupClientForTest() {
112    }
113
114    public IncrementalTableBackupClientForTest(Connection conn, String backupId,
115      BackupRequest request) throws IOException {
116      super(conn, backupId, request);
117    }
118
119    @Before
120    public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
121      // Every operation here may not be necessary for any given test,
122      // some often being no-ops. the goal is to help ensure atomicity
123      // of that tests that implement TestBackupBase
124      try (BackupAdmin backupAdmin = getBackupAdmin()) {
125        backupManager.finishBackupSession();
126        backupAdmin.listBackupSets().forEach(backupSet -> {
127          try {
128            backupAdmin.deleteBackupSet(backupSet.getName());
129          } catch (IOException ignored) {
130          }
131        });
132      } catch (Exception ignored) {
133      }
134      Arrays.stream(TEST_UTIL.getAdmin().listTableNames())
135        .filter(tableName -> !tableName.isSystemTable()).forEach(tableName -> {
136          try {
137            TEST_UTIL.truncateTable(tableName);
138          } catch (IOException ignored) {
139          }
140        });
141      TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> {
142        try {
143          LogRoller walRoller = rst.getRegionServer().getWalRoller();
144          walRoller.requestRollAll();
145          walRoller.waitUntilWalRollFinished();
146        } catch (Exception ignored) {
147        }
148      });
149    }
150
151    @Override
152    public void execute() throws IOException {
153      // case INCREMENTAL_COPY:
154      try {
155        // case PREPARE_INCREMENTAL:
156        failStageIf(Stage.stage_0);
157        beginBackup(backupManager, backupInfo);
158
159        failStageIf(Stage.stage_1);
160        backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
161        LOG.debug("For incremental backup, current table set is "
162          + backupManager.getIncrementalBackupTableSet());
163        newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
164        // copy out the table and region info files for each table
165        BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
166        // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
167        convertWALsToHFiles();
168        incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
169          backupInfo.getBackupRootDir());
170        failStageIf(Stage.stage_2);
171
172        // case INCR_BACKUP_COMPLETE:
173        // set overall backup status: complete. Here we make sure to complete the backup.
174        // After this checkpoint, even if entering cancel process, will let the backup finished
175        // Set the previousTimestampMap which is before this current log roll to the manifest.
176        Map<TableName, Map<String, Long>> previousTimestampMap =
177          backupManager.readLogTimestampMap();
178        backupInfo.setIncrTimestampMap(previousTimestampMap);
179
180        // The table list in backupInfo is good for both full backup and incremental backup.
181        // For incremental backup, it contains the incremental backup table set.
182        backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
183        failStageIf(Stage.stage_3);
184
185        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
186          backupManager.readLogTimestampMap();
187
188        Long newStartCode =
189          BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
190        backupManager.writeBackupStartCode(newStartCode);
191
192        handleBulkLoad(backupInfo.getTableNames());
193        failStageIf(Stage.stage_4);
194
195        // backup complete
196        completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
197
198      } catch (Exception e) {
199        failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
200          BackupType.INCREMENTAL, conf);
201        throw new IOException(e);
202      }
203    }
204  }
205
206  static class FullTableBackupClientForTest extends FullTableBackupClient {
207    public FullTableBackupClientForTest() {
208    }
209
210    public FullTableBackupClientForTest(Connection conn, String backupId, BackupRequest request)
211      throws IOException {
212      super(conn, backupId, request);
213    }
214
215    @Override
216    public void execute() throws IOException {
217      // Get the stage ID to fail on
218      try (Admin admin = conn.getAdmin()) {
219        // Begin BACKUP
220        beginBackup(backupManager, backupInfo);
221        failStageIf(Stage.stage_0);
222        String savedStartCode;
223        boolean firstBackup;
224        // do snapshot for full table backup
225        savedStartCode = backupManager.readBackupStartCode();
226        firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
227        if (firstBackup) {
228          // This is our first backup. Let's put some marker to system table so that we can hold the
229          // logs while we do the backup.
230          backupManager.writeBackupStartCode(0L);
231        }
232        failStageIf(Stage.stage_1);
233        // We roll log here before we do the snapshot. It is possible there is duplicate data
234        // in the log that is already in the snapshot. But if we do it after the snapshot, we
235        // could have data loss.
236        // A better approach is to do the roll log on each RS in the same global procedure as
237        // the snapshot.
238        LOG.info("Execute roll log procedure for full backup ...");
239
240        BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
241        failStageIf(Stage.stage_2);
242        newTimestamps = backupManager.readRegionServerLastLogRollResult();
243
244        // SNAPSHOT_TABLES:
245        backupInfo.setPhase(BackupPhase.SNAPSHOT);
246        for (TableName tableName : tableList) {
247          String snapshotName = "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime())
248            + "_" + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
249
250          snapshotTable(admin, tableName, snapshotName);
251          backupInfo.setSnapshotName(tableName, snapshotName);
252        }
253        failStageIf(Stage.stage_3);
254        // SNAPSHOT_COPY:
255        // do snapshot copy
256        LOG.debug("snapshot copy for " + backupId);
257        snapshotCopy(backupInfo);
258        // Updates incremental backup table set
259        backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
260
261        // BACKUP_COMPLETE:
262        // set overall backup status: complete. Here we make sure to complete the backup.
263        // After this checkpoint, even if entering cancel process, will let the backup finished
264        backupInfo.setState(BackupState.COMPLETE);
265        // The table list in backupInfo is good for both full backup and incremental backup.
266        // For incremental backup, it contains the incremental backup table set.
267        backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
268
269        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
270          backupManager.readLogTimestampMap();
271
272        Long newStartCode =
273          BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
274        backupManager.writeBackupStartCode(newStartCode);
275        failStageIf(Stage.stage_4);
276        // backup complete
277        completeBackup(conn, backupInfo, BackupType.FULL, conf);
278
279      } catch (Exception e) {
280
281        if (autoRestoreOnFailure) {
282          failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
283            BackupType.FULL, conf);
284        }
285        throw new IOException(e);
286      }
287    }
288  }
289
290  public static void setUpHelper() throws Exception {
291    BACKUP_ROOT_DIR = Path.SEPARATOR + "backupUT";
292    BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT";
293
294    if (secure) {
295      // set the always on security provider
296      UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
297        HadoopSecurityEnabledUserProviderForTesting.class);
298      // setup configuration
299      SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
300    }
301    conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
302    BackupManager.decorateMasterConfiguration(conf1);
303    BackupManager.decorateRegionServerConfiguration(conf1);
304    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
305    // Set TTL for old WALs to 1 sec to enforce fast cleaning of an archived
306    // WAL files
307    conf1.setLong(TimeToLiveLogCleaner.TTL_CONF_KEY, 1000);
308    conf1.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 1000);
309
310    // Set MultiWAL (with 2 default WAL files per RS)
311    conf1.set(WALFactory.WAL_PROVIDER, provider);
312    TEST_UTIL.startMiniCluster();
313
314    if (useSecondCluster) {
315      conf2 = HBaseConfiguration.create(conf1);
316      conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
317      TEST_UTIL2 = new HBaseTestingUtil(conf2);
318      TEST_UTIL2.setZkCluster(TEST_UTIL.getZkCluster());
319      TEST_UTIL2.startMiniDFSCluster(3);
320      String root2 = TEST_UTIL2.getConfiguration().get("fs.defaultFS");
321      Path p = new Path(new Path(root2), "/tmp/wal");
322      CommonFSUtils.setWALRootDir(TEST_UTIL2.getConfiguration(), p);
323      TEST_UTIL2.startMiniCluster();
324    }
325    conf1 = TEST_UTIL.getConfiguration();
326
327    TEST_UTIL.startMiniMapReduceCluster();
328    BACKUP_ROOT_DIR =
329      new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")), BACKUP_ROOT_DIR)
330        .toString();
331    LOG.info("ROOTDIR " + BACKUP_ROOT_DIR);
332    if (useSecondCluster) {
333      BACKUP_REMOTE_ROOT_DIR = new Path(
334        new Path(TEST_UTIL2.getConfiguration().get("fs.defaultFS")) + BACKUP_REMOTE_ROOT_DIR)
335        .toString();
336      LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
337    }
338    createTables();
339    populateFromMasterConfig(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), conf1);
340  }
341
342  /**
343   * Setup Cluster with appropriate configurations before running tests.
344   * @throws Exception if starting the mini cluster or setting up the tables fails
345   */
346  @BeforeClass
347  public static void setUp() throws Exception {
348    TEST_UTIL = new HBaseTestingUtil();
349    conf1 = TEST_UTIL.getConfiguration();
350    autoRestoreOnFailure = true;
351    useSecondCluster = false;
352    setUpHelper();
353  }
354
355  private static void populateFromMasterConfig(Configuration masterConf, Configuration conf) {
356    Iterator<Entry<String, String>> it = masterConf.iterator();
357    while (it.hasNext()) {
358      Entry<String, String> e = it.next();
359      conf.set(e.getKey(), e.getValue());
360    }
361  }
362
363  @AfterClass
364  public static void tearDown() throws Exception {
365    try {
366      SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getAdmin());
367    } catch (Exception e) {
368    }
369    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
370    if (useSecondCluster) {
371      TEST_UTIL2.shutdownMiniCluster();
372    }
373    TEST_UTIL.shutdownMiniCluster();
374    TEST_UTIL.shutdownMiniMapReduceCluster();
375    autoRestoreOnFailure = true;
376    useSecondCluster = false;
377  }
378
379  Table insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
380    throws IOException {
381    Table t = conn.getTable(table);
382    Put p1;
383    for (int i = 0; i < numRows; i++) {
384      p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-" + i));
385      p1.addColumn(family, qualName, Bytes.toBytes("val" + i));
386      t.put(p1);
387    }
388    return t;
389  }
390
391  protected BackupRequest createBackupRequest(BackupType type, List<TableName> tables,
392    String path) {
393    return createBackupRequest(type, tables, path, false);
394  }
395
396  protected BackupRequest createBackupRequest(BackupType type, List<TableName> tables, String path,
397    boolean noChecksumVerify) {
398    BackupRequest.Builder builder = new BackupRequest.Builder();
399    BackupRequest request = builder.withBackupType(type).withTableList(tables)
400      .withTargetRootDir(path).withNoChecksumVerify(noChecksumVerify).build();
401    return request;
402  }
403
404  protected String backupTables(BackupType type, List<TableName> tables, String path)
405    throws IOException {
406    Connection conn = null;
407    BackupAdmin badmin = null;
408    String backupId;
409    try {
410      conn = ConnectionFactory.createConnection(conf1);
411      badmin = new BackupAdminImpl(conn);
412      BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path);
413      backupId = badmin.backupTables(request);
414    } finally {
415      if (badmin != null) {
416        badmin.close();
417      }
418      if (conn != null) {
419        conn.close();
420      }
421    }
422    return backupId;
423  }
424
425  protected String fullTableBackup(List<TableName> tables) throws IOException {
426    return backupTables(BackupType.FULL, tables, BACKUP_ROOT_DIR);
427  }
428
429  protected String incrementalTableBackup(List<TableName> tables) throws IOException {
430    return backupTables(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
431  }
432
433  protected static void loadTable(Table table) throws Exception {
434    Put p; // 100 + 1 row to t1_syncup
435    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
436      p = new Put(Bytes.toBytes("row" + i));
437      p.setDurability(Durability.SKIP_WAL);
438      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
439      table.put(p);
440    }
441  }
442
443  protected static void createTables() throws Exception {
444    long tid = EnvironmentEdgeManager.currentTime();
445    table1 = TableName.valueOf("test-" + tid);
446    Admin ha = TEST_UTIL.getAdmin();
447
448    // Create namespaces
449    ha.createNamespace(NamespaceDescriptor.create("ns1").build());
450    ha.createNamespace(NamespaceDescriptor.create("ns2").build());
451    ha.createNamespace(NamespaceDescriptor.create("ns3").build());
452    ha.createNamespace(NamespaceDescriptor.create("ns4").build());
453
454    TableDescriptor desc = TableDescriptorBuilder.newBuilder(table1)
455      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
456    ha.createTable(desc);
457    table1Desc = desc;
458    Connection conn = ConnectionFactory.createConnection(conf1);
459    Table table = conn.getTable(table1);
460    loadTable(table);
461    table.close();
462    table2 = TableName.valueOf("ns2:test-" + tid + 1);
463    desc = TableDescriptorBuilder.newBuilder(table2)
464      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
465    ha.createTable(desc);
466    table = conn.getTable(table2);
467    loadTable(table);
468    table.close();
469    table3 = TableName.valueOf("ns3:test-" + tid + 2);
470    table = TEST_UTIL.createTable(table3, famName);
471    table.close();
472    table4 = TableName.valueOf("ns4:test-" + tid + 3);
473    table = TEST_UTIL.createTable(table4, famName);
474    table.close();
475    ha.close();
476    conn.close();
477  }
478
479  protected boolean checkSucceeded(String backupId) throws IOException {
480    BackupInfo status = getBackupInfo(backupId);
481
482    if (status == null) {
483      return false;
484    }
485
486    return status.getState() == BackupState.COMPLETE;
487  }
488
489  protected boolean checkFailed(String backupId) throws IOException {
490    BackupInfo status = getBackupInfo(backupId);
491
492    if (status == null) {
493      return false;
494    }
495
496    return status.getState() == BackupState.FAILED;
497  }
498
499  private BackupInfo getBackupInfo(String backupId) throws IOException {
500    try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) {
501      BackupInfo status = table.readBackupInfo(backupId);
502      return status;
503    }
504  }
505
506  protected static BackupAdmin getBackupAdmin() throws IOException {
507    return new BackupAdminImpl(TEST_UTIL.getConnection());
508  }
509
510  /**
511   * Helper method
512   */
513  protected List<TableName> toList(String... args) {
514    List<TableName> ret = new ArrayList<>();
515    for (int i = 0; i < args.length; i++) {
516      ret.add(TableName.valueOf(args[i]));
517    }
518    return ret;
519  }
520
521  protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
522    Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
523    FileSystem fs = logRoot.getFileSystem(c);
524    RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
525    List<FileStatus> logFiles = new ArrayList<FileStatus>();
526    while (it.hasNext()) {
527      LocatedFileStatus lfs = it.next();
528      if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
529        logFiles.add(lfs);
530        LOG.info(Objects.toString(lfs));
531      }
532    }
533    return logFiles;
534  }
535
536  protected void dumpBackupDir() throws IOException {
537    // Dump Backup Dir
538    FileSystem fs = FileSystem.get(conf1);
539    RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(BACKUP_ROOT_DIR), true);
540    while (it.hasNext()) {
541      LOG.debug(Objects.toString(it.next().getPath()));
542    }
543  }
544}