001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.nio.charset.Charset; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Set; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.backup.BackupAdmin; 032import org.apache.hadoop.hbase.backup.BackupInfo; 033import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; 034import org.apache.hadoop.hbase.backup.BackupRequest; 035import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 036import org.apache.hadoop.hbase.backup.BackupType; 037import org.apache.hadoop.hbase.backup.RestoreRequest; 038import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 039import org.apache.hadoop.hbase.backup.impl.BackupManager; 040import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 041import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; 042import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; 043import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; 044import org.apache.hadoop.hbase.chaos.policies.Policy; 045import org.apache.hadoop.hbase.client.Admin; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.testclassification.IntegrationTests; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.util.ToolRunner; 055import org.junit.After; 056import org.junit.Assert; 057import org.junit.Before; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 064import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 066import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 067 068/** 069 * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load 070 * data, perform series backup/load operations, then restore and verify data 071 * @see <a href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a> 072 * @see <a href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a> 073 */ 074@Category(IntegrationTests.class) 075public class IntegrationTestBackupRestore extends IntegrationTestBase { 076 private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName(); 077 protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class); 078 protected static final String NUMBER_OF_TABLES_KEY = "num_tables"; 079 protected static final String COLUMN_NAME = "f"; 080 protected static final String REGION_COUNT_KEY = "regions_per_rs"; 081 protected static final String REGIONSERVER_COUNT_KEY = "region_servers"; 082 protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration"; 083 protected static final String NUM_ITERATIONS_KEY = "num_iterations"; 084 protected static final int DEFAULT_REGION_COUNT = 10; 085 protected static final int DEFAULT_REGIONSERVER_COUNT = 5; 086 protected static final int DEFAULT_NUMBER_OF_TABLES = 1; 087 protected static final int DEFAULT_NUM_ITERATIONS = 10; 088 protected static final int DEFAULT_ROWS_IN_ITERATION = 10000; 089 protected static final String SLEEP_TIME_KEY = "sleeptime"; 090 // short default interval because tests don't run very long. 091 protected static final long SLEEP_TIME_DEFAULT = 50000L; 092 093 protected static int rowsInIteration; 094 protected static int regionsCountPerServer; 095 protected static int regionServerCount; 096 097 protected static int numIterations; 098 protected static int numTables; 099 protected static TableName[] tableNames; 100 protected long sleepTime; 101 protected static Object lock = new Object(); 102 103 private static String BACKUP_ROOT_DIR = "backupIT"; 104 105 /* 106 * This class is used to run the backup and restore thread(s). Throwing an exception in this 107 * thread will not cause the test to fail, so the purpose of this class is to both kick off the 108 * backup and restore and record any exceptions that occur so they can be thrown in the main 109 * thread. 110 */ 111 protected class BackupAndRestoreThread implements Runnable { 112 private final TableName table; 113 private Exception exc; 114 115 public BackupAndRestoreThread(TableName table) { 116 this.table = table; 117 this.exc = null; 118 } 119 120 public Exception getException() { 121 return this.exc; 122 } 123 124 @Override 125 public void run() { 126 try { 127 runTestSingle(this.table); 128 } catch (Exception e) { 129 LOG.error( 130 "An exception occurred in thread {} when performing a backup and restore with table {}: ", 131 Thread.currentThread().getName(), this.table.getNameAsString(), e); 132 this.exc = e; 133 } 134 } 135 } 136 137 @Override 138 @Before 139 public void setUp() throws Exception { 140 util = new IntegrationTestingUtility(); 141 Configuration conf = util.getConfiguration(); 142 regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); 143 regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); 144 rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); 145 numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); 146 numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); 147 sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); 148 enableBackup(conf); 149 LOG.info("Initializing cluster with {} region servers.", regionServerCount); 150 util.initializeCluster(regionServerCount); 151 LOG.info("Cluster initialized and ready"); 152 } 153 154 @After 155 public void tearDown() throws IOException { 156 LOG.info("Cleaning up after test."); 157 if (util.isDistributedCluster()) { 158 deleteTablesIfAny(); 159 LOG.info("Cleaning up after test. Deleted tables"); 160 cleanUpBackupDir(); 161 } 162 LOG.info("Restoring cluster."); 163 util.restoreCluster(); 164 LOG.info("Cluster restored."); 165 } 166 167 @Override 168 public void setUpMonkey() throws Exception { 169 Policy p = 170 new PeriodicRandomActionPolicy(sleepTime, new RestartRandomRsExceptMetaAction(sleepTime)); 171 this.monkey = new PolicyBasedChaosMonkey(util, p); 172 startMonkey(); 173 } 174 175 private void deleteTablesIfAny() throws IOException { 176 for (TableName table : tableNames) { 177 util.deleteTableIfAny(table); 178 } 179 } 180 181 private void createTables() throws Exception { 182 tableNames = new TableName[numTables]; 183 for (int i = 0; i < numTables; i++) { 184 tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i); 185 } 186 for (TableName table : tableNames) { 187 createTable(table); 188 } 189 } 190 191 private void enableBackup(Configuration conf) { 192 // Enable backup 193 conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); 194 BackupManager.decorateMasterConfiguration(conf); 195 BackupManager.decorateRegionServerConfiguration(conf); 196 } 197 198 private void cleanUpBackupDir() throws IOException { 199 FileSystem fs = FileSystem.get(util.getConfiguration()); 200 fs.delete(new Path(BACKUP_ROOT_DIR), true); 201 } 202 203 @Test 204 public void testBackupRestore() throws Exception { 205 BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR; 206 createTables(); 207 runTestMulti(); 208 } 209 210 private void runTestMulti() throws Exception { 211 LOG.info("IT backup & restore started"); 212 Thread[] workers = new Thread[numTables]; 213 BackupAndRestoreThread[] backupAndRestoreThreads = new BackupAndRestoreThread[numTables]; 214 for (int i = 0; i < numTables; i++) { 215 final TableName table = tableNames[i]; 216 BackupAndRestoreThread backupAndRestoreThread = new BackupAndRestoreThread(table); 217 backupAndRestoreThreads[i] = backupAndRestoreThread; 218 workers[i] = new Thread(backupAndRestoreThread); 219 workers[i].start(); 220 } 221 // Wait for all workers to finish and check for errors 222 Exception error = null; 223 Exception threadExc; 224 for (int i = 0; i < numTables; i++) { 225 Uninterruptibles.joinUninterruptibly(workers[i]); 226 threadExc = backupAndRestoreThreads[i].getException(); 227 if (threadExc == null) { 228 continue; 229 } 230 if (error == null) { 231 error = threadExc; 232 } else { 233 error.addSuppressed(threadExc); 234 } 235 } 236 // Throw any found errors after all threads have completed 237 if (error != null) { 238 throw error; 239 } 240 LOG.info("IT backup & restore finished"); 241 } 242 243 private void createTable(TableName tableName) throws Exception { 244 long startTime, endTime; 245 246 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 247 248 TableDescriptor desc = builder.build(); 249 ColumnFamilyDescriptorBuilder cbuilder = 250 ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset())); 251 ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() }; 252 LOG.info("Creating table {} with {} splits.", tableName, 253 regionsCountPerServer * regionServerCount); 254 startTime = EnvironmentEdgeManager.currentTime(); 255 createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, regionsCountPerServer); 256 util.waitTableAvailable(tableName); 257 endTime = EnvironmentEdgeManager.currentTime(); 258 LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime)); 259 } 260 261 private void loadData(TableName table, int numRows) throws IOException { 262 Connection conn = util.getConnection(); 263 // #0- insert some data to a table 264 Table t1 = conn.getTable(table); 265 util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows); 266 // flush table 267 conn.getAdmin().flush(TableName.valueOf(table.getName())); 268 } 269 270 private String backup(BackupRequest request, BackupAdmin client) throws IOException { 271 return client.backupTables(request); 272 } 273 274 private void restore(RestoreRequest request, BackupAdmin client) throws IOException { 275 client.restore(request); 276 } 277 278 private void merge(String[] backupIds, BackupAdmin client) throws IOException { 279 client.mergeBackups(backupIds); 280 } 281 282 private void runTestSingle(TableName table) throws IOException { 283 284 List<String> backupIds = new ArrayList<String>(); 285 286 try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin(); 287 BackupAdmin client = new BackupAdminImpl(conn);) { 288 289 // #0- insert some data to table 'table' 290 loadData(table, rowsInIteration); 291 292 // #1 - create full backup for table first 293 LOG.info("create full backup image for {}", table); 294 List<TableName> tables = Lists.newArrayList(table); 295 BackupRequest.Builder builder = new BackupRequest.Builder(); 296 BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) 297 .withTargetRootDir(BACKUP_ROOT_DIR).build(); 298 299 String backupIdFull = backup(request, client); 300 assertTrue(checkSucceeded(backupIdFull)); 301 302 backupIds.add(backupIdFull); 303 // Now continue with incremental backups 304 int count = 1; 305 while (count++ < numIterations) { 306 307 // Load data 308 loadData(table, rowsInIteration); 309 // Do incremental backup 310 builder = new BackupRequest.Builder(); 311 request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) 312 .withTargetRootDir(BACKUP_ROOT_DIR).build(); 313 String backupId = backup(request, client); 314 assertTrue(checkSucceeded(backupId)); 315 backupIds.add(backupId); 316 317 // Restore incremental backup for table, with overwrite for previous backup 318 String previousBackupId = backupIds.get(backupIds.size() - 2); 319 restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1)); 320 // Restore incremental backup for table, with overwrite for last backup 321 restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count); 322 } 323 // Now merge all incremental and restore 324 String[] incBackupIds = allIncremental(backupIds); 325 merge(incBackupIds, client); 326 // Restore last one 327 String backupId = incBackupIds[incBackupIds.length - 1]; 328 // restore incremental backup for table, with overwrite 329 TableName[] tablesRestoreIncMultiple = new TableName[] { table }; 330 restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, 331 true), client); 332 Table hTable = conn.getTable(table); 333 Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations); 334 hTable.close(); 335 LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count - 1)); 336 } 337 } 338 339 private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table, 340 String backupId, long expectedRows) throws IOException { 341 TableName[] tablesRestoreIncMultiple = new TableName[] { table }; 342 restore( 343 createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true), 344 client); 345 Table hTable = conn.getTable(table); 346 Assert.assertEquals(expectedRows, util.countRows(hTable)); 347 hTable.close(); 348 } 349 350 private String[] allIncremental(List<String> backupIds) { 351 int size = backupIds.size(); 352 backupIds = backupIds.subList(1, size); 353 String[] arr = new String[size - 1]; 354 backupIds.toArray(arr); 355 return arr; 356 } 357 358 /** Returns status of backup */ 359 protected boolean checkSucceeded(String backupId) throws IOException { 360 BackupInfo status = getBackupInfo(backupId); 361 if (status == null) { 362 return false; 363 } 364 return status.getState() == BackupState.COMPLETE; 365 } 366 367 private BackupInfo getBackupInfo(String backupId) throws IOException { 368 try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) { 369 return table.readBackupInfo(backupId); 370 } 371 } 372 373 /** 374 * Get restore request. 375 * @param backupRootDir directory where backup is located 376 * @param backupId backup ID 377 * @param check check the backup 378 * @param fromTables table names to restore from 379 * @param toTables new table names to restore to 380 * @param isOverwrite overwrite the table(s) 381 * @return an instance of RestoreRequest 382 */ 383 public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, 384 TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { 385 RestoreRequest.Builder builder = new RestoreRequest.Builder(); 386 return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) 387 .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); 388 } 389 390 @Override 391 public void setUpCluster() throws Exception { 392 util = getTestingUtil(getConf()); 393 enableBackup(getConf()); 394 LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); 395 util.initializeCluster(regionServerCount); 396 LOG.debug("Done initializing/checking cluster"); 397 } 398 399 /** Returns status of CLI execution */ 400 @Override 401 public int runTestFromCommandLine() throws Exception { 402 // Check if backup is enabled 403 if (!BackupManager.isBackupEnabled(getConf())) { 404 System.err.println(BackupRestoreConstants.ENABLE_BACKUP); 405 return -1; 406 } 407 System.out.println(BackupRestoreConstants.VERIFY_BACKUP); 408 testBackupRestore(); 409 return 0; 410 } 411 412 @Override 413 public TableName getTablename() { 414 // That is only valid when Monkey is CALM (no monkey) 415 return null; 416 } 417 418 @Override 419 protected Set<String> getColumnFamilies() { 420 // That is only valid when Monkey is CALM (no monkey) 421 return null; 422 } 423 424 @Override 425 protected void addOptions() { 426 addOptWithArg(REGIONSERVER_COUNT_KEY, 427 "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); 428 addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT); 429 addOptWithArg(ROWS_PER_ITERATION_KEY, 430 "Total number of data rows to be loaded during one iteration." + " Default: " 431 + DEFAULT_ROWS_IN_ITERATION); 432 addOptWithArg(NUM_ITERATIONS_KEY, 433 "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS); 434 addOptWithArg(NUMBER_OF_TABLES_KEY, 435 "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES); 436 addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms " 437 + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT); 438 } 439 440 @Override 441 protected void processOptions(CommandLine cmd) { 442 super.processOptions(cmd); 443 regionsCountPerServer = Integer 444 .parseInt(cmd.getOptionValue(REGION_COUNT_KEY, Integer.toString(DEFAULT_REGION_COUNT))); 445 regionServerCount = Integer.parseInt( 446 cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); 447 rowsInIteration = Integer.parseInt( 448 cmd.getOptionValue(ROWS_PER_ITERATION_KEY, Integer.toString(DEFAULT_ROWS_IN_ITERATION))); 449 numIterations = Integer 450 .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY, Integer.toString(DEFAULT_NUM_ITERATIONS))); 451 numTables = Integer.parseInt( 452 cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES))); 453 sleepTime = 454 Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT))); 455 456 LOG.info(MoreObjects.toStringHelper("Parsed Options") 457 .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount) 458 .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) 459 .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); 460 } 461 462 public static void main(String[] args) throws Exception { 463 Configuration conf = HBaseConfiguration.create(); 464 IntegrationTestingUtility.setUseDistributedCluster(conf); 465 int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args); 466 System.exit(status); 467 } 468}