001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.backup;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Objects;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
041import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
042import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
043import org.apache.hadoop.hbase.backup.impl.BackupManager;
044import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
045import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
046import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
047import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient;
048import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
049import org.apache.hadoop.hbase.backup.util.BackupUtils;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.client.ConnectionFactory;
054import org.apache.hadoop.hbase.client.Durability;
055import org.apache.hadoop.hbase.client.Put;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
060import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner;
061import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
062import org.apache.hadoop.hbase.security.UserProvider;
063import org.apache.hadoop.hbase.security.access.SecureTestUtil;
064import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.CommonFSUtils;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.junit.AfterClass;
071import org.junit.BeforeClass;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * This class is only a base for other integration-level backup tests. Do not add tests here.
077 * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
078 * tests should have their own classes and extend this one
079 */
080public class TestBackupBase {
081  private static final Logger LOG = LoggerFactory.getLogger(TestBackupBase.class);
082
083  protected static HBaseTestingUtil TEST_UTIL;
084  protected static HBaseTestingUtil TEST_UTIL2;
085  protected static Configuration conf1;
086  protected static Configuration conf2;
087
088  protected static TableName table1 = TableName.valueOf("table1");
089  protected static TableDescriptor table1Desc;
090  protected static TableName table2 = TableName.valueOf("table2");
091  protected static TableName table3 = TableName.valueOf("table3");
092  protected static TableName table4 = TableName.valueOf("table4");
093
094  protected static TableName table1_restore = TableName.valueOf("default:table1");
095  protected static TableName table2_restore = TableName.valueOf("ns2:table2");
096  protected static TableName table3_restore = TableName.valueOf("ns3:table3_restore");
097
098  protected static final int NB_ROWS_IN_BATCH = 99;
099  protected static final byte[] qualName = Bytes.toBytes("q1");
100  protected static final byte[] famName = Bytes.toBytes("f");
101
102  protected static String BACKUP_ROOT_DIR;
103  protected static String BACKUP_REMOTE_ROOT_DIR;
104  protected static String provider = "defaultProvider";
105  protected static boolean secure = false;
106
107  protected static boolean autoRestoreOnFailure;
108  protected static boolean useSecondCluster;
109
110  static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient {
111    public IncrementalTableBackupClientForTest() {
112    }
113
114    public IncrementalTableBackupClientForTest(Connection conn,
115        String backupId, BackupRequest request) throws IOException {
116      super(conn, backupId, request);
117    }
118
119    @Override
120    public void execute() throws IOException {
121      // case INCREMENTAL_COPY:
122      try {
123        // case PREPARE_INCREMENTAL:
124        failStageIf(Stage.stage_0);
125        beginBackup(backupManager, backupInfo);
126
127        failStageIf(Stage.stage_1);
128        backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
129        LOG.debug("For incremental backup, current table set is "
130            + backupManager.getIncrementalBackupTableSet());
131        newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
132        // copy out the table and region info files for each table
133        BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
134        // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
135        convertWALsToHFiles();
136        incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()},
137          backupInfo.getBackupRootDir());
138        failStageIf(Stage.stage_2);
139
140        // case INCR_BACKUP_COMPLETE:
141        // set overall backup status: complete. Here we make sure to complete the backup.
142        // After this checkpoint, even if entering cancel process, will let the backup finished
143        // Set the previousTimestampMap which is before this current log roll to the manifest.
144        Map<TableName, Map<String, Long>> previousTimestampMap =
145            backupManager.readLogTimestampMap();
146        backupInfo.setIncrTimestampMap(previousTimestampMap);
147
148        // The table list in backupInfo is good for both full backup and incremental backup.
149        // For incremental backup, it contains the incremental backup table set.
150        backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
151        failStageIf(Stage.stage_3);
152
153        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
154            backupManager.readLogTimestampMap();
155
156        Long newStartCode =
157            BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
158        backupManager.writeBackupStartCode(newStartCode);
159
160        handleBulkLoad(backupInfo.getTableNames());
161        failStageIf(Stage.stage_4);
162
163        // backup complete
164        completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);
165
166      } catch (Exception e) {
167        failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
168          BackupType.INCREMENTAL, conf);
169        throw new IOException(e);
170      }
171    }
172  }
173
174  static class FullTableBackupClientForTest extends FullTableBackupClient {
175    public FullTableBackupClientForTest() {
176    }
177
178    public FullTableBackupClientForTest(Connection conn, String backupId, BackupRequest request)
179        throws IOException {
180      super(conn, backupId, request);
181    }
182
183    @Override
184    public void execute() throws IOException {
185      // Get the stage ID to fail on
186      try (Admin admin = conn.getAdmin()) {
187        // Begin BACKUP
188        beginBackup(backupManager, backupInfo);
189        failStageIf(Stage.stage_0);
190        String savedStartCode;
191        boolean firstBackup;
192        // do snapshot for full table backup
193        savedStartCode = backupManager.readBackupStartCode();
194        firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
195        if (firstBackup) {
196          // This is our first backup. Let's put some marker to system table so that we can hold the
197          // logs while we do the backup.
198          backupManager.writeBackupStartCode(0L);
199        }
200        failStageIf(Stage.stage_1);
201        // We roll log here before we do the snapshot. It is possible there is duplicate data
202        // in the log that is already in the snapshot. But if we do it after the snapshot, we
203        // could have data loss.
204        // A better approach is to do the roll log on each RS in the same global procedure as
205        // the snapshot.
206        LOG.info("Execute roll log procedure for full backup ...");
207
208        Map<String, String> props = new HashMap<>();
209        props.put("backupRoot", backupInfo.getBackupRootDir());
210        admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
211          LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
212        failStageIf(Stage.stage_2);
213        newTimestamps = backupManager.readRegionServerLastLogRollResult();
214
215        // SNAPSHOT_TABLES:
216        backupInfo.setPhase(BackupPhase.SNAPSHOT);
217        for (TableName tableName : tableList) {
218          String snapshotName =
219              "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_"
220                  + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
221
222          snapshotTable(admin, tableName, snapshotName);
223          backupInfo.setSnapshotName(tableName, snapshotName);
224        }
225        failStageIf(Stage.stage_3);
226        // SNAPSHOT_COPY:
227        // do snapshot copy
228        LOG.debug("snapshot copy for " + backupId);
229        snapshotCopy(backupInfo);
230        // Updates incremental backup table set
231        backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
232
233        // BACKUP_COMPLETE:
234        // set overall backup status: complete. Here we make sure to complete the backup.
235        // After this checkpoint, even if entering cancel process, will let the backup finished
236        backupInfo.setState(BackupState.COMPLETE);
237        // The table list in backupInfo is good for both full backup and incremental backup.
238        // For incremental backup, it contains the incremental backup table set.
239        backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
240
241        Map<TableName, Map<String, Long>> newTableSetTimestampMap =
242            backupManager.readLogTimestampMap();
243
244        Long newStartCode =
245            BackupUtils.getMinValue(BackupUtils
246                .getRSLogTimestampMins(newTableSetTimestampMap));
247        backupManager.writeBackupStartCode(newStartCode);
248        failStageIf(Stage.stage_4);
249        // backup complete
250        completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf);
251
252      } catch (Exception e) {
253
254        if(autoRestoreOnFailure) {
255          failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
256            BackupType.FULL, conf);
257        }
258        throw new IOException(e);
259      }
260    }
261  }
262
263  public static void setUpHelper() throws Exception {
264    BACKUP_ROOT_DIR = Path.SEPARATOR +"backupUT";
265    BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT";
266
267    if (secure) {
268      // set the always on security provider
269      UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
270          HadoopSecurityEnabledUserProviderForTesting.class);
271      // setup configuration
272      SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
273    }
274    conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
275    BackupManager.decorateMasterConfiguration(conf1);
276    BackupManager.decorateRegionServerConfiguration(conf1);
277    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
278    // Set TTL for old WALs to 1 sec to enforce fast cleaning of an archived
279    // WAL files
280    conf1.setLong(TimeToLiveLogCleaner.TTL_CONF_KEY, 1000);
281    conf1.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 1000);
282
283    // Set MultiWAL (with 2 default WAL files per RS)
284    conf1.set(WALFactory.WAL_PROVIDER, provider);
285    TEST_UTIL.startMiniCluster();
286
287    if (useSecondCluster) {
288      conf2 = HBaseConfiguration.create(conf1);
289      conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
290      TEST_UTIL2 = new HBaseTestingUtil(conf2);
291      TEST_UTIL2.setZkCluster(TEST_UTIL.getZkCluster());
292      TEST_UTIL2.startMiniDFSCluster(3);
293      String root2 = TEST_UTIL2.getConfiguration().get("fs.defaultFS");
294      Path p = new Path(new Path(root2), "/tmp/wal");
295      CommonFSUtils.setWALRootDir(TEST_UTIL2.getConfiguration(), p);
296      TEST_UTIL2.startMiniCluster();
297    }
298    conf1 = TEST_UTIL.getConfiguration();
299
300    TEST_UTIL.startMiniMapReduceCluster();
301    BACKUP_ROOT_DIR =
302        new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")),
303          BACKUP_ROOT_DIR).toString();
304    LOG.info("ROOTDIR " + BACKUP_ROOT_DIR);
305    if (useSecondCluster) {
306      BACKUP_REMOTE_ROOT_DIR =
307          new Path(new Path(TEST_UTIL2.getConfiguration().get("fs.defaultFS"))
308          + BACKUP_REMOTE_ROOT_DIR).toString();
309      LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
310    }
311    createTables();
312    populateFromMasterConfig(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), conf1);
313  }
314
315
316  /**
317   * Setup Cluster with appropriate configurations before running tests.
318   *
319   * @throws Exception if starting the mini cluster or setting up the tables fails
320   */
321  @BeforeClass
322  public static void setUp() throws Exception {
323    TEST_UTIL = new HBaseTestingUtil();
324    conf1 = TEST_UTIL.getConfiguration();
325    autoRestoreOnFailure = true;
326    useSecondCluster = false;
327    setUpHelper();
328  }
329
330
331  private static void populateFromMasterConfig(Configuration masterConf, Configuration conf) {
332    Iterator<Entry<String, String>> it = masterConf.iterator();
333    while (it.hasNext()) {
334      Entry<String, String> e = it.next();
335      conf.set(e.getKey(), e.getValue());
336    }
337  }
338
339  /**
340   * @throws Exception if deleting the archive directory or shutting down the mini cluster fails
341   */
342  @AfterClass
343  public static void tearDown() throws Exception {
344    try{
345      SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getAdmin());
346    } catch (Exception e) {
347    }
348    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
349    if (useSecondCluster) {
350      TEST_UTIL2.shutdownMiniCluster();
351    }
352    TEST_UTIL.shutdownMiniCluster();
353    TEST_UTIL.shutdownMiniMapReduceCluster();
354    autoRestoreOnFailure = true;
355    useSecondCluster = false;
356  }
357
358  Table insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
359      throws IOException {
360    Table t = conn.getTable(table);
361    Put p1;
362    for (int i = 0; i < numRows; i++) {
363      p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-" + i));
364      p1.addColumn(family, qualName, Bytes.toBytes("val" + i));
365      t.put(p1);
366    }
367    return t;
368  }
369
370  protected BackupRequest createBackupRequest(BackupType type,
371      List<TableName> tables, String path) {
372    BackupRequest.Builder builder = new BackupRequest.Builder();
373    BackupRequest request = builder.withBackupType(type)
374                                    .withTableList(tables)
375                                    .withTargetRootDir(path).build();
376    return request;
377  }
378
379  protected String backupTables(BackupType type, List<TableName> tables, String path)
380      throws IOException {
381    Connection conn = null;
382    BackupAdmin badmin = null;
383    String backupId;
384    try {
385      conn = ConnectionFactory.createConnection(conf1);
386      badmin = new BackupAdminImpl(conn);
387      BackupRequest request = createBackupRequest(type, tables, path);
388      backupId = badmin.backupTables(request);
389    } finally {
390      if (badmin != null) {
391        badmin.close();
392      }
393      if (conn != null) {
394        conn.close();
395      }
396    }
397    return backupId;
398  }
399
400  protected String fullTableBackup(List<TableName> tables) throws IOException {
401    return backupTables(BackupType.FULL, tables, BACKUP_ROOT_DIR);
402  }
403
404  protected String incrementalTableBackup(List<TableName> tables) throws IOException {
405    return backupTables(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
406  }
407
408  protected static void loadTable(Table table) throws Exception {
409    Put p; // 100 + 1 row to t1_syncup
410    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
411      p = new Put(Bytes.toBytes("row" + i));
412      p.setDurability(Durability.SKIP_WAL);
413      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
414      table.put(p);
415    }
416  }
417
418  protected static void createTables() throws Exception {
419    long tid = EnvironmentEdgeManager.currentTime();
420    table1 = TableName.valueOf("test-" + tid);
421    Admin ha = TEST_UTIL.getAdmin();
422
423    // Create namespaces
424    ha.createNamespace(NamespaceDescriptor.create("ns1").build());
425    ha.createNamespace(NamespaceDescriptor.create("ns2").build());
426    ha.createNamespace(NamespaceDescriptor.create("ns3").build());
427    ha.createNamespace(NamespaceDescriptor.create("ns4").build());
428
429    TableDescriptor desc = TableDescriptorBuilder.newBuilder(table1)
430      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
431    ha.createTable(desc);
432    table1Desc = desc;
433    Connection conn = ConnectionFactory.createConnection(conf1);
434    Table table = conn.getTable(table1);
435    loadTable(table);
436    table.close();
437    table2 = TableName.valueOf("ns2:test-" + tid + 1);
438    desc = TableDescriptorBuilder.newBuilder(table2)
439      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
440    ha.createTable(desc);
441    table = conn.getTable(table2);
442    loadTable(table);
443    table.close();
444    table3 = TableName.valueOf("ns3:test-" + tid + 2);
445    table = TEST_UTIL.createTable(table3, famName);
446    table.close();
447    table4 = TableName.valueOf("ns4:test-" + tid + 3);
448    table = TEST_UTIL.createTable(table4, famName);
449    table.close();
450    ha.close();
451    conn.close();
452  }
453
454  protected boolean checkSucceeded(String backupId) throws IOException {
455    BackupInfo status = getBackupInfo(backupId);
456
457    if (status == null) {
458      return false;
459    }
460
461    return status.getState() == BackupState.COMPLETE;
462  }
463
464  protected boolean checkFailed(String backupId) throws IOException {
465    BackupInfo status = getBackupInfo(backupId);
466
467    if (status == null) {
468      return false;
469    }
470
471    return status.getState() == BackupState.FAILED;
472  }
473
474  private BackupInfo getBackupInfo(String backupId) throws IOException {
475    try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) {
476      BackupInfo status = table.readBackupInfo(backupId);
477      return status;
478    }
479  }
480
481  protected BackupAdmin getBackupAdmin() throws IOException {
482    return new BackupAdminImpl(TEST_UTIL.getConnection());
483  }
484
485  /**
486   * Helper method
487   */
488  protected List<TableName> toList(String... args) {
489    List<TableName> ret = new ArrayList<>();
490    for (int i = 0; i < args.length; i++) {
491      ret.add(TableName.valueOf(args[i]));
492    }
493    return ret;
494  }
495
496  protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
497    Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
498    FileSystem fs = logRoot.getFileSystem(c);
499    RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
500    List<FileStatus> logFiles = new ArrayList<FileStatus>();
501    while (it.hasNext()) {
502      LocatedFileStatus lfs = it.next();
503      if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
504        logFiles.add(lfs);
505        LOG.info(Objects.toString(lfs));
506      }
507    }
508    return logFiles;
509  }
510
511  protected void dumpBackupDir() throws IOException {
512    // Dump Backup Dir
513    FileSystem fs = FileSystem.get(conf1);
514    RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(BACKUP_ROOT_DIR), true);
515    while (it.hasNext()) {
516      LOG.debug(Objects.toString(it.next().getPath()));
517    }
518  }
519}