001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.nio.charset.Charset;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Set;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.backup.BackupAdmin;
032import org.apache.hadoop.hbase.backup.BackupInfo;
033import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
034import org.apache.hadoop.hbase.backup.BackupRequest;
035import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
036import org.apache.hadoop.hbase.backup.BackupType;
037import org.apache.hadoop.hbase.backup.RestoreRequest;
038import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
039import org.apache.hadoop.hbase.backup.impl.BackupManager;
040import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
041import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
042import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
043import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
044import org.apache.hadoop.hbase.chaos.policies.Policy;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.testclassification.IntegrationTests;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.util.ToolRunner;
055import org.junit.After;
056import org.junit.Assert;
057import org.junit.Before;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
064import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
066import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
067
068/**
069 * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load
070 * data, perform series backup/load operations, then restore and verify data
071 * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a>
072 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a>
073 */
074@Category(IntegrationTests.class)
075public class IntegrationTestBackupRestore extends IntegrationTestBase {
076  private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName();
077  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
078  protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
079  protected static final String COLUMN_NAME = "f";
080  protected static final String REGION_COUNT_KEY = "regions_per_rs";
081  protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
082  protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
083  protected static final String NUM_ITERATIONS_KEY = "num_iterations";
084  protected static final int DEFAULT_REGION_COUNT = 10;
085  protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
086  protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
087  protected static final int DEFAULT_NUM_ITERATIONS = 10;
088  protected static final int DEFAULT_ROWS_IN_ITERATION = 10000;
089  protected static final String SLEEP_TIME_KEY = "sleeptime";
090  // short default interval because tests don't run very long.
091  protected static final long SLEEP_TIME_DEFAULT = 50000L;
092
093  protected static int rowsInIteration;
094  protected static int regionsCountPerServer;
095  protected static int regionServerCount;
096
097  protected static int numIterations;
098  protected static int numTables;
099  protected static TableName[] tableNames;
100  protected long sleepTime;
101  protected static Object lock = new Object();
102
103  private static String BACKUP_ROOT_DIR = "backupIT";
104
105  /*
106   * This class is used to run the backup and restore thread(s). Throwing an exception in this
107   * thread will not cause the test to fail, so the purpose of this class is to both kick off the
108   * backup and restore and record any exceptions that occur so they can be thrown in the main
109   * thread.
110   */
111  protected class BackupAndRestoreThread implements Runnable {
112    private final TableName table;
113    private Exception exc;
114
115    public BackupAndRestoreThread(TableName table) {
116      this.table = table;
117      this.exc = null;
118    }
119
120    public Exception getException() {
121      return this.exc;
122    }
123
124    @Override
125    public void run() {
126      try {
127        runTestSingle(this.table);
128      } catch (Exception e) {
129        LOG.error(
130          "An exception occurred in thread {} when performing a backup and restore with table {}: ",
131          Thread.currentThread().getName(), this.table.getNameAsString(), e);
132        this.exc = e;
133      }
134    }
135  }
136
137  @Override
138  @Before
139  public void setUp() throws Exception {
140    util = new IntegrationTestingUtility();
141    Configuration conf = util.getConfiguration();
142    regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT);
143    regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT);
144    rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION);
145    numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS);
146    numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES);
147    sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
148    enableBackup(conf);
149    LOG.info("Initializing cluster with {} region servers.", regionServerCount);
150    util.initializeCluster(regionServerCount);
151    LOG.info("Cluster initialized and ready");
152  }
153
154  @After
155  public void tearDown() throws IOException {
156    LOG.info("Cleaning up after test.");
157    if (util.isDistributedCluster()) {
158      deleteTablesIfAny();
159      LOG.info("Cleaning up after test. Deleted tables");
160      cleanUpBackupDir();
161    }
162    LOG.info("Restoring cluster.");
163    util.restoreCluster();
164    LOG.info("Cluster restored.");
165  }
166
167  @Override
168  public void setUpMonkey() throws Exception {
169    Policy p =
170      new PeriodicRandomActionPolicy(sleepTime, new RestartRandomRsExceptMetaAction(sleepTime));
171    this.monkey = new PolicyBasedChaosMonkey(util, p);
172    startMonkey();
173  }
174
175  private void deleteTablesIfAny() throws IOException {
176    for (TableName table : tableNames) {
177      util.deleteTableIfAny(table);
178    }
179  }
180
181  private void createTables() throws Exception {
182    tableNames = new TableName[numTables];
183    for (int i = 0; i < numTables; i++) {
184      tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i);
185    }
186    for (TableName table : tableNames) {
187      createTable(table);
188    }
189  }
190
191  private void enableBackup(Configuration conf) {
192    // Enable backup
193    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
194    BackupManager.decorateMasterConfiguration(conf);
195    BackupManager.decorateRegionServerConfiguration(conf);
196  }
197
198  private void cleanUpBackupDir() throws IOException {
199    FileSystem fs = FileSystem.get(util.getConfiguration());
200    fs.delete(new Path(BACKUP_ROOT_DIR), true);
201  }
202
203  @Test
204  public void testBackupRestore() throws Exception {
205    BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR;
206    createTables();
207    runTestMulti();
208  }
209
210  private void runTestMulti() throws Exception {
211    LOG.info("IT backup & restore started");
212    Thread[] workers = new Thread[numTables];
213    BackupAndRestoreThread[] backupAndRestoreThreads = new BackupAndRestoreThread[numTables];
214    for (int i = 0; i < numTables; i++) {
215      final TableName table = tableNames[i];
216      BackupAndRestoreThread backupAndRestoreThread = new BackupAndRestoreThread(table);
217      backupAndRestoreThreads[i] = backupAndRestoreThread;
218      workers[i] = new Thread(backupAndRestoreThread);
219      workers[i].start();
220    }
221    // Wait for all workers to finish and check for errors
222    Exception error = null;
223    Exception threadExc;
224    for (int i = 0; i < numTables; i++) {
225      Uninterruptibles.joinUninterruptibly(workers[i]);
226      threadExc = backupAndRestoreThreads[i].getException();
227      if (threadExc == null) {
228        continue;
229      }
230      if (error == null) {
231        error = threadExc;
232      } else {
233        error.addSuppressed(threadExc);
234      }
235    }
236    // Throw any found errors after all threads have completed
237    if (error != null) {
238      throw error;
239    }
240    LOG.info("IT backup & restore finished");
241  }
242
243  private void createTable(TableName tableName) throws Exception {
244    long startTime, endTime;
245
246    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
247
248    TableDescriptor desc = builder.build();
249    ColumnFamilyDescriptorBuilder cbuilder =
250      ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
251    ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() };
252    LOG.info("Creating table {} with {} splits.", tableName,
253      regionsCountPerServer * regionServerCount);
254    startTime = EnvironmentEdgeManager.currentTime();
255    createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, regionsCountPerServer);
256    util.waitTableAvailable(tableName);
257    endTime = EnvironmentEdgeManager.currentTime();
258    LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime));
259  }
260
261  private void loadData(TableName table, int numRows) throws IOException {
262    Connection conn = util.getConnection();
263    // #0- insert some data to a table
264    Table t1 = conn.getTable(table);
265    util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows);
266    // flush table
267    conn.getAdmin().flush(TableName.valueOf(table.getName()));
268  }
269
270  private String backup(BackupRequest request, BackupAdmin client) throws IOException {
271    return client.backupTables(request);
272  }
273
274  private void restore(RestoreRequest request, BackupAdmin client) throws IOException {
275    client.restore(request);
276  }
277
278  private void merge(String[] backupIds, BackupAdmin client) throws IOException {
279    client.mergeBackups(backupIds);
280  }
281
282  private void runTestSingle(TableName table) throws IOException {
283
284    List<String> backupIds = new ArrayList<String>();
285
286    try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin();
287      BackupAdmin client = new BackupAdminImpl(conn);) {
288
289      // #0- insert some data to table 'table'
290      loadData(table, rowsInIteration);
291
292      // #1 - create full backup for table first
293      LOG.info("create full backup image for {}", table);
294      List<TableName> tables = Lists.newArrayList(table);
295      BackupRequest.Builder builder = new BackupRequest.Builder();
296      BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables)
297        .withTargetRootDir(BACKUP_ROOT_DIR).build();
298
299      String backupIdFull = backup(request, client);
300      assertTrue(checkSucceeded(backupIdFull));
301
302      backupIds.add(backupIdFull);
303      // Now continue with incremental backups
304      int count = 1;
305      while (count++ < numIterations) {
306
307        // Load data
308        loadData(table, rowsInIteration);
309        // Do incremental backup
310        builder = new BackupRequest.Builder();
311        request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
312          .withTargetRootDir(BACKUP_ROOT_DIR).build();
313        String backupId = backup(request, client);
314        assertTrue(checkSucceeded(backupId));
315        backupIds.add(backupId);
316
317        // Restore incremental backup for table, with overwrite for previous backup
318        String previousBackupId = backupIds.get(backupIds.size() - 2);
319        restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1));
320        // Restore incremental backup for table, with overwrite for last backup
321        restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count);
322      }
323      // Now merge all incremental and restore
324      String[] incBackupIds = allIncremental(backupIds);
325      merge(incBackupIds, client);
326      // Restore last one
327      String backupId = incBackupIds[incBackupIds.length - 1];
328      // restore incremental backup for table, with overwrite
329      TableName[] tablesRestoreIncMultiple = new TableName[] { table };
330      restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null,
331        true), client);
332      Table hTable = conn.getTable(table);
333      Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations);
334      hTable.close();
335      LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count - 1));
336    }
337  }
338
339  private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table,
340    String backupId, long expectedRows) throws IOException {
341    TableName[] tablesRestoreIncMultiple = new TableName[] { table };
342    restore(
343      createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true),
344      client);
345    Table hTable = conn.getTable(table);
346    Assert.assertEquals(expectedRows, util.countRows(hTable));
347    hTable.close();
348  }
349
350  private String[] allIncremental(List<String> backupIds) {
351    int size = backupIds.size();
352    backupIds = backupIds.subList(1, size);
353    String[] arr = new String[size - 1];
354    backupIds.toArray(arr);
355    return arr;
356  }
357
358  /** Returns status of backup */
359  protected boolean checkSucceeded(String backupId) throws IOException {
360    BackupInfo status = getBackupInfo(backupId);
361    if (status == null) {
362      return false;
363    }
364    return status.getState() == BackupState.COMPLETE;
365  }
366
367  private BackupInfo getBackupInfo(String backupId) throws IOException {
368    try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) {
369      return table.readBackupInfo(backupId);
370    }
371  }
372
373  /**
374   * Get restore request.
375   * @param backupRootDir directory where backup is located
376   * @param backupId      backup ID
377   * @param check         check the backup
378   * @param fromTables    table names to restore from
379   * @param toTables      new table names to restore to
380   * @param isOverwrite   overwrite the table(s)
381   * @return an instance of RestoreRequest
382   */
383  public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check,
384    TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
385    RestoreRequest.Builder builder = new RestoreRequest.Builder();
386    return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
387      .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
388  }
389
390  @Override
391  public void setUpCluster() throws Exception {
392    util = getTestingUtil(getConf());
393    enableBackup(getConf());
394    LOG.debug("Initializing/checking cluster has {} servers", regionServerCount);
395    util.initializeCluster(regionServerCount);
396    LOG.debug("Done initializing/checking cluster");
397  }
398
399  /** Returns status of CLI execution */
400  @Override
401  public int runTestFromCommandLine() throws Exception {
402    // Check if backup is enabled
403    if (!BackupManager.isBackupEnabled(getConf())) {
404      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
405      return -1;
406    }
407    System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
408    testBackupRestore();
409    return 0;
410  }
411
412  @Override
413  public TableName getTablename() {
414    // That is only valid when Monkey is CALM (no monkey)
415    return null;
416  }
417
418  @Override
419  protected Set<String> getColumnFamilies() {
420    // That is only valid when Monkey is CALM (no monkey)
421    return null;
422  }
423
424  @Override
425  protected void addOptions() {
426    addOptWithArg(REGIONSERVER_COUNT_KEY,
427      "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'");
428    addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT);
429    addOptWithArg(ROWS_PER_ITERATION_KEY,
430      "Total number of data rows to be loaded during one iteration." + " Default: "
431        + DEFAULT_ROWS_IN_ITERATION);
432    addOptWithArg(NUM_ITERATIONS_KEY,
433      "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
434    addOptWithArg(NUMBER_OF_TABLES_KEY,
435      "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES);
436    addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms "
437      + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT);
438  }
439
440  @Override
441  protected void processOptions(CommandLine cmd) {
442    super.processOptions(cmd);
443    regionsCountPerServer = Integer
444      .parseInt(cmd.getOptionValue(REGION_COUNT_KEY, Integer.toString(DEFAULT_REGION_COUNT)));
445    regionServerCount = Integer.parseInt(
446      cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
447    rowsInIteration = Integer.parseInt(
448      cmd.getOptionValue(ROWS_PER_ITERATION_KEY, Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
449    numIterations = Integer
450      .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY, Integer.toString(DEFAULT_NUM_ITERATIONS)));
451    numTables = Integer.parseInt(
452      cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
453    sleepTime =
454      Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT)));
455
456    LOG.info(MoreObjects.toStringHelper("Parsed Options")
457      .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount)
458      .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations)
459      .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString());
460  }
461
462  public static void main(String[] args) throws Exception {
463    Configuration conf = HBaseConfiguration.create();
464    IntegrationTestingUtility.setUseDistributedCluster(conf);
465    int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args);
466    System.exit(status);
467  }
468}