001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.nio.charset.Charset;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Set;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.backup.BackupAdmin;
033import org.apache.hadoop.hbase.backup.BackupInfo;
034import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
035import org.apache.hadoop.hbase.backup.BackupRequest;
036import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
037import org.apache.hadoop.hbase.backup.BackupType;
038import org.apache.hadoop.hbase.backup.RestoreRequest;
039import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
040import org.apache.hadoop.hbase.backup.impl.BackupManager;
041import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
042import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
043import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
044import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
045import org.apache.hadoop.hbase.chaos.policies.Policy;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.testclassification.IntegrationTests;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.util.ToolRunner;
056import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
057import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
059import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
060
061import org.junit.After;
062import org.junit.Assert;
063import org.junit.Before;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069
070/**
071 * An integration test to detect regressions in HBASE-7912. Create
072 * a table with many regions, load data, perform series backup/load operations,
073 * then restore and verify data
074 * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a>
075 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a>
076 */
077@Category(IntegrationTests.class)
078public class IntegrationTestBackupRestore extends IntegrationTestBase {
079  private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName();
080  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
081  protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
082  protected static final String COLUMN_NAME = "f";
083  protected static final String REGION_COUNT_KEY = "regions_per_rs";
084  protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
085  protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
086  protected static final String NUM_ITERATIONS_KEY = "num_iterations";
087  protected static final int DEFAULT_REGION_COUNT = 10;
088  protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
089  protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
090  protected static final int DEFAULT_NUM_ITERATIONS = 10;
091  protected static final int DEFAULT_ROWS_IN_ITERATION = 500000;
092  protected static final String SLEEP_TIME_KEY = "sleeptime";
093  // short default interval because tests don't run very long.
094  protected static final long SLEEP_TIME_DEFAULT = 50000L;
095
096  protected static int rowsInIteration;
097  protected static int regionsCountPerServer;
098  protected static int regionServerCount;
099
100  protected static int numIterations;
101  protected static int numTables;
102  protected static TableName[] tableNames;
103  protected long sleepTime;
104  protected static Object lock = new Object();
105
106  private static String BACKUP_ROOT_DIR = "backupIT";
107
108  @Override
109  @Before
110  public void setUp() throws Exception {
111    util = new IntegrationTestingUtility();
112    Configuration conf = util.getConfiguration();
113    regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT);
114    regionServerCount =
115        conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT);
116    rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION);
117    numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS);
118    numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES);
119    sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
120    enableBackup(conf);
121    LOG.info("Initializing cluster with {} region servers.", regionServerCount);
122    util.initializeCluster(regionServerCount);
123    LOG.info("Cluster initialized and ready");
124  }
125
126  @After
127  public void tearDown() throws IOException {
128    LOG.info("Cleaning up after test.");
129    if(util.isDistributedCluster()) {
130      deleteTablesIfAny();
131      LOG.info("Cleaning up after test. Deleted tables");
132      cleanUpBackupDir();
133    }
134    LOG.info("Restoring cluster.");
135    util.restoreCluster();
136    LOG.info("Cluster restored.");
137  }
138
139  @Override
140  public void setUpMonkey() throws Exception {
141    Policy p = new PeriodicRandomActionPolicy(sleepTime,
142      new RestartRandomRsExceptMetaAction(sleepTime));
143    this.monkey = new PolicyBasedChaosMonkey(util, p);
144    startMonkey();
145  }
146
147  private void deleteTablesIfAny() throws IOException {
148    for (TableName table : tableNames) {
149      util.deleteTableIfAny(table);
150    }
151  }
152
153  private void createTables() throws Exception {
154    tableNames = new TableName[numTables];
155    for (int i = 0; i < numTables; i++) {
156      tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i);
157    }
158    for (TableName table : tableNames) {
159      createTable(table);
160    }
161  }
162
163  private void enableBackup(Configuration conf) {
164    // Enable backup
165    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
166    BackupManager.decorateMasterConfiguration(conf);
167    BackupManager.decorateRegionServerConfiguration(conf);
168  }
169
170  private void cleanUpBackupDir() throws IOException {
171    FileSystem fs = FileSystem.get(util.getConfiguration());
172    fs.delete(new Path(BACKUP_ROOT_DIR), true);
173  }
174
175  @Test
176  public void testBackupRestore() throws Exception {
177    BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR;
178    createTables();
179    runTestMulti();
180  }
181
182  private void runTestMulti() throws IOException {
183    LOG.info("IT backup & restore started");
184    Thread[] workers = new Thread[numTables];
185    for (int i = 0; i < numTables; i++) {
186      final TableName table = tableNames[i];
187      Runnable r = new Runnable() {
188        @Override
189        public void run() {
190          try {
191            runTestSingle(table);
192          } catch (IOException e) {
193            LOG.error("Failed", e);
194            Assert.fail(e.getMessage());
195          }
196        }
197      };
198      workers[i] = new Thread(r);
199      workers[i].start();
200    }
201    // Wait all workers to finish
202    for (Thread t : workers) {
203      Uninterruptibles.joinUninterruptibly(t);
204    }
205    LOG.info("IT backup & restore finished");
206  }
207
208  private void createTable(TableName tableName) throws Exception {
209    long startTime, endTime;
210
211    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
212
213    TableDescriptor desc = builder.build();
214    ColumnFamilyDescriptorBuilder cbuilder =
215        ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
216    ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() };
217    LOG.info("Creating table {} with {} splits.", tableName,
218      regionsCountPerServer * regionServerCount);
219    startTime = EnvironmentEdgeManager.currentTime();
220    HBaseTestingUtil.createPreSplitLoadTestTable(util.getConfiguration(), desc, columns,
221      regionsCountPerServer);
222    util.waitTableAvailable(tableName);
223    endTime = EnvironmentEdgeManager.currentTime();
224    LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime));
225  }
226
227  private void loadData(TableName table, int numRows) throws IOException {
228    Connection conn = util.getConnection();
229    // #0- insert some data to a table
230    Table t1 = conn.getTable(table);
231    util.loadRandomRows(t1, new byte[]{'f'}, 100, numRows);
232    // flush table
233    conn.getAdmin().flush(TableName.valueOf(table.getName()));
234  }
235
236  private String backup(BackupRequest request, BackupAdmin client)
237      throws IOException {
238    String backupId = client.backupTables(request);
239    return backupId;
240  }
241
242  private void restore(RestoreRequest request, BackupAdmin client)
243      throws IOException {
244    client.restore(request);
245  }
246
247  private void merge(String[] backupIds, BackupAdmin client)
248      throws IOException {
249    client.mergeBackups(backupIds);
250  }
251
252  private void runTestSingle(TableName table) throws IOException {
253
254    List<String> backupIds = new ArrayList<String>();
255    List<Integer> tableSizes = new ArrayList<Integer>();
256
257    try (Connection conn = util.getConnection();
258        Admin admin = conn.getAdmin();
259        BackupAdmin client = new BackupAdminImpl(conn);) {
260
261      // #0- insert some data to table 'table'
262      loadData(table, rowsInIteration);
263      tableSizes.add(rowsInIteration);
264
265      // #1 - create full backup for table first
266      LOG.info("create full backup image for {}", table);
267      List<TableName> tables = Lists.newArrayList(table);
268      BackupRequest.Builder builder = new BackupRequest.Builder();
269      BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables)
270          .withTargetRootDir(BACKUP_ROOT_DIR).build();
271
272      String backupIdFull = backup(request, client);
273      assertTrue(checkSucceeded(backupIdFull));
274
275      backupIds.add(backupIdFull);
276      // Now continue with incremental backups
277      int count = 1;
278      while (count++ < numIterations) {
279
280        // Load data
281        loadData(table, rowsInIteration);
282        tableSizes.add(rowsInIteration * count);
283        // Do incremental backup
284        builder = new BackupRequest.Builder();
285        request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
286            .withTargetRootDir(BACKUP_ROOT_DIR).build();
287        String backupId = backup(request, client);
288        assertTrue(checkSucceeded(backupId));
289        backupIds.add(backupId);
290
291        // Restore incremental backup for table, with overwrite for previous backup
292        String previousBackupId = backupIds.get(backupIds.size() - 2);
293        restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1));
294        // Restore incremental backup for table, with overwrite for last backup
295        restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count);
296      }
297      // Now merge all incremental and restore
298      String[] incBackupIds = allIncremental(backupIds);
299      merge(incBackupIds, client);
300      // Restore last one
301      String backupId = incBackupIds[incBackupIds.length - 1];
302      // restore incremental backup for table, with overwrite
303      TableName[] tablesRestoreIncMultiple = new TableName[] { table };
304      restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null,
305        true), client);
306      Table hTable = conn.getTable(table);
307      Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations);
308      hTable.close();
309      LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count-1));
310    }
311  }
312
313  private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table,
314      String backupId, long expectedRows) throws IOException {
315
316    TableName[] tablesRestoreIncMultiple = new TableName[] { table };
317    restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false,
318      tablesRestoreIncMultiple, null, true), client);
319    Table hTable = conn.getTable(table);
320    Assert.assertEquals(expectedRows, util.countRows(hTable));
321    hTable.close();
322  }
323
324  private String[] allIncremental(List<String> backupIds) {
325    int size = backupIds.size();
326    backupIds = backupIds.subList(1, size);
327    String[] arr = new String[size - 1];
328    backupIds.toArray(arr);
329    return arr;
330  }
331
332  /**
333   *
334   * @param backupId pass backup ID to check status of
335   * @return status of backup
336   */
337  protected boolean checkSucceeded(String backupId) throws IOException {
338    BackupInfo status = getBackupInfo(backupId);
339    if (status == null) {
340      return false;
341    }
342    return status.getState() == BackupState.COMPLETE;
343  }
344
345  private BackupInfo getBackupInfo(String backupId) throws IOException {
346    try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) {
347      return table.readBackupInfo(backupId);
348    }
349  }
350
351  /**
352   * Get restore request.
353   *
354   * @param backupRootDir directory where backup is located
355   * @param backupId backup ID
356   * @param check check the backup
357   * @param fromTables table names to restore from
358   * @param toTables new table names to restore to
359   * @param isOverwrite overwrite the table(s)
360   * @return an instance of RestoreRequest
361   */
362  public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check,
363      TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
364    RestoreRequest.Builder builder = new RestoreRequest.Builder();
365    return builder.withBackupRootDir(backupRootDir)
366        .withBackupId(backupId)
367        .withCheck(check)
368        .withFromTables(fromTables)
369        .withToTables(toTables)
370        .withOvewrite(isOverwrite).build();
371  }
372
373  @Override
374  public void setUpCluster() throws Exception {
375    util = getTestingUtil(getConf());
376    enableBackup(getConf());
377    LOG.debug("Initializing/checking cluster has {} servers",regionServerCount);
378    util.initializeCluster(regionServerCount);
379    LOG.debug("Done initializing/checking cluster");
380  }
381
382  /**
383   *
384   * @return status of CLI execution
385   */
386  @Override
387  public int runTestFromCommandLine() throws Exception {
388    // Check if backup is enabled
389    if (!BackupManager.isBackupEnabled(getConf())) {
390      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
391      return -1;
392    }
393    System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
394    testBackupRestore();
395    return 0;
396  }
397
398  @Override
399  public TableName getTablename() {
400    // That is only valid when Monkey is CALM (no monkey)
401    return null;
402  }
403
404  @Override
405  protected Set<String> getColumnFamilies() {
406    // That is only valid when Monkey is CALM (no monkey)
407    return null;
408  }
409
410  @Override
411  protected void addOptions() {
412    addOptWithArg(REGIONSERVER_COUNT_KEY,
413      "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'");
414    addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT);
415    addOptWithArg(ROWS_PER_ITERATION_KEY,
416      "Total number of data rows to be loaded during one iteration." + " Default: "
417          + DEFAULT_ROWS_IN_ITERATION);
418    addOptWithArg(NUM_ITERATIONS_KEY,
419      "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
420    addOptWithArg(NUMBER_OF_TABLES_KEY,
421      "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES);
422    addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms " +
423            "to restart random region server. Default: " + SLEEP_TIME_DEFAULT);
424  }
425
426  @Override
427  protected void processOptions(CommandLine cmd) {
428    super.processOptions(cmd);
429    regionsCountPerServer =
430        Integer.parseInt(cmd.getOptionValue(REGION_COUNT_KEY,
431          Integer.toString(DEFAULT_REGION_COUNT)));
432    regionServerCount =
433        Integer.parseInt(cmd.getOptionValue(REGIONSERVER_COUNT_KEY,
434          Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
435    rowsInIteration =
436        Integer.parseInt(cmd.getOptionValue(ROWS_PER_ITERATION_KEY,
437          Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
438    numIterations = Integer.parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY,
439      Integer.toString(DEFAULT_NUM_ITERATIONS)));
440    numTables = Integer.parseInt(cmd.getOptionValue(NUMBER_OF_TABLES_KEY,
441      Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
442    sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY,
443      Long.toString(SLEEP_TIME_DEFAULT)));
444
445
446    LOG.info(MoreObjects.toStringHelper("Parsed Options")
447      .add(REGION_COUNT_KEY, regionsCountPerServer)
448      .add(REGIONSERVER_COUNT_KEY, regionServerCount)
449      .add(ROWS_PER_ITERATION_KEY, rowsInIteration)
450      .add(NUM_ITERATIONS_KEY, numIterations)
451      .add(NUMBER_OF_TABLES_KEY, numTables)
452      .add(SLEEP_TIME_KEY, sleepTime)
453      .toString());
454  }
455
456  /**
457   *
458   * @param args argument list
459   */
460  public static void main(String[] args) throws Exception {
461    Configuration conf = HBaseConfiguration.create();
462    IntegrationTestingUtility.setUseDistributedCluster(conf);
463    int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args);
464    System.exit(status);
465  }
466}