001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 033import org.apache.hadoop.hbase.backup.impl.BackupCommands; 034import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 035import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; 036import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; 037import org.apache.hadoop.hbase.backup.util.BackupUtils; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.util.Pair; 044import org.junit.jupiter.api.Assertions; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052@Tag(LargeTests.TAG) 053public class TestIncrementalBackupMergeWithFailures extends TestBackupBase { 054 055 private static final Logger LOG = 056 LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class); 057 058 enum FailurePhase { 059 PHASE1, 060 PHASE2, 061 PHASE3, 062 PHASE4 063 } 064 065 public final static String FAILURE_PHASE_KEY = "failurePhase"; 066 067 static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob { 068 FailurePhase failurePhase; 069 070 @Override 071 public void setConf(Configuration conf) { 072 super.setConf(conf); 073 String val = conf.get(FAILURE_PHASE_KEY); 074 if (val != null) { 075 failurePhase = FailurePhase.valueOf(val); 076 } else { 077 Assertions.fail("Failure phase is not set"); 078 } 079 } 080 081 /** 082 * This is the exact copy of parent's run() with injections of different types of failures 083 */ 084 @Override 085 public void run(String[] backupIds) throws IOException { 086 String bulkOutputConfKey; 087 088 // TODO : run player on remote cluster 089 player = new MapReduceHFileSplitterJob(); 090 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 091 // Player reads all files in arbitrary directory structure and creates 092 // a Map task for each file 093 String bids = StringUtils.join(backupIds, ","); 094 095 if (LOG.isDebugEnabled()) { 096 LOG.debug("Merge backup images " + bids); 097 } 098 099 List<Pair<TableName, Path>> processedTableList = new ArrayList<>(); 100 boolean finishedTables = false; 101 Connection conn = ConnectionFactory.createConnection(getConf()); 102 BackupSystemTable table = new BackupSystemTable(conn); 103 FileSystem fs = FileSystem.get(getConf()); 104 105 try { 106 // Start backup exclusive operation 107 table.startBackupExclusiveOperation(); 108 // Start merge operation 109 table.startMergeOperation(backupIds); 110 111 // Select most recent backup id 112 String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds); 113 114 TableName[] tableNames = getTableNamesInBackupImages(backupIds); 115 116 BackupInfo bInfo = table.readBackupInfo(backupIds[0]); 117 String backupRoot = bInfo.getBackupRootDir(); 118 // PHASE 1 119 checkFailure(FailurePhase.PHASE1); 120 121 for (int i = 0; i < tableNames.length; i++) { 122 LOG.info("Merge backup images for " + tableNames[i]); 123 124 // Find input directories for table 125 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); 126 String dirs = StringUtils.join(dirPaths, ","); 127 Path bulkOutputPath = BackupUtils.getBulkOutputDir( 128 BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false); 129 // Delete content if exists 130 if (fs.exists(bulkOutputPath)) { 131 if (!fs.delete(bulkOutputPath, true)) { 132 LOG.warn("Can not delete: " + bulkOutputPath); 133 } 134 } 135 Configuration conf = getConf(); 136 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 137 String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; 138 139 // PHASE 2 140 checkFailure(FailurePhase.PHASE2); 141 player.setConf(getConf()); 142 int result = player.run(playerArgs); 143 if (succeeded(result)) { 144 // Add to processed table list 145 processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath)); 146 } else { 147 throw new IOException("Can not merge backup images for " + dirs 148 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 149 } 150 LOG.debug("Merge Job finished:" + result); 151 } 152 List<TableName> tableList = toTableNameList(processedTableList); 153 // PHASE 3 154 checkFailure(FailurePhase.PHASE3); 155 table.updateProcessedTablesForMerge(tableList); 156 finishedTables = true; 157 158 // (modification of a backup file system) 159 // Move existing mergedBackupId data into tmp directory 160 // we will need it later in case of a failure 161 Path tmpBackupDir = 162 HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId); 163 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId); 164 if (!fs.rename(backupDirPath, tmpBackupDir)) { 165 throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir); 166 } else { 167 LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir); 168 } 169 // Move new data into backup dest 170 for (Pair<TableName, Path> tn : processedTableList) { 171 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); 172 } 173 checkFailure(FailurePhase.PHASE4); 174 // Update backup manifest 175 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); 176 updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete); 177 // Copy meta files back from tmp to backup dir 178 copyMetaData(fs, tmpBackupDir, backupDirPath); 179 // Delete tmp dir (Rename back during repair) 180 if (!fs.delete(tmpBackupDir, true)) { 181 // WARN and ignore 182 LOG.warn("Could not delete tmp dir: " + tmpBackupDir); 183 } 184 // Delete old data 185 deleteBackupImages(backupsToDelete, conn, fs, backupRoot); 186 // Finish merge session 187 table.finishMergeOperation(); 188 // Release lock 189 table.finishBackupExclusiveOperation(); 190 } catch (RuntimeException e) { 191 throw e; 192 } catch (Exception e) { 193 LOG.error(e.toString(), e); 194 if (!finishedTables) { 195 // cleanup bulk directories and finish merge 196 // merge MUST be repeated (no need for repair) 197 cleanupBulkLoadDirs(fs, toPathList(processedTableList)); 198 table.finishMergeOperation(); 199 table.finishBackupExclusiveOperation(); 200 throw new IOException("Backup merge operation failed, you should try it again", e); 201 } else { 202 // backup repair must be run 203 throw new IOException( 204 "Backup merge operation failed, run backup repair tool to restore system's integrity", 205 e); 206 } 207 } finally { 208 table.close(); 209 conn.close(); 210 } 211 } 212 213 private void checkFailure(FailurePhase phase) throws IOException { 214 if (failurePhase != null && failurePhase == phase) { 215 throw new IOException(phase.toString()); 216 } 217 } 218 } 219 220 @Test 221 public void TestIncBackupMergeRestore() throws Exception { 222 int ADD_ROWS = 99; 223 // #1 - create full backup for all tables 224 LOG.info("create full backup image for all tables"); 225 226 List<TableName> tables = Lists.newArrayList(table1, table2); 227 // Set custom Merge Job implementation 228 conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS, 229 BackupMergeJobWithFailures.class, BackupMergeJob.class); 230 231 Connection conn = ConnectionFactory.createConnection(conf1); 232 233 Admin admin = conn.getAdmin(); 234 BackupAdminImpl client = new BackupAdminImpl(conn); 235 236 BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); 237 String backupIdFull = client.backupTables(request); 238 239 assertTrue(checkSucceeded(backupIdFull)); 240 241 // #2 - insert some data to table1 242 Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); 243 LOG.debug("writing " + ADD_ROWS + " rows to " + table1); 244 245 Assertions.assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t1)); 246 t1.close(); 247 LOG.debug("written " + ADD_ROWS + " rows to " + table1); 248 249 Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); 250 251 Assertions.assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t2)); 252 t2.close(); 253 LOG.debug("written " + ADD_ROWS + " rows to " + table2); 254 255 // #3 - incremental backup for multiple tables 256 tables = Lists.newArrayList(table1, table2); 257 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); 258 String backupIdIncMultiple = client.backupTables(request); 259 260 assertTrue(checkSucceeded(backupIdIncMultiple)); 261 262 t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS); 263 t1.close(); 264 265 t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS); 266 t2.close(); 267 268 // #3 - incremental backup for multiple tables 269 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); 270 String backupIdIncMultiple2 = client.backupTables(request); 271 assertTrue(checkSucceeded(backupIdIncMultiple2)); 272 // #4 Merge backup images with failures 273 274 for (FailurePhase phase : FailurePhase.values()) { 275 Configuration conf = conn.getConfiguration(); 276 277 conf.set(FAILURE_PHASE_KEY, phase.toString()); 278 279 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) { 280 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; 281 bAdmin.mergeBackups(backups); 282 Assertions.fail("Expected IOException"); 283 } catch (IOException e) { 284 BackupSystemTable table = new BackupSystemTable(conn); 285 if (phase.ordinal() < FailurePhase.PHASE4.ordinal()) { 286 // No need to repair: 287 // Both Merge and backup exclusive operations are finished 288 assertFalse(table.isMergeInProgress()); 289 try { 290 table.finishBackupExclusiveOperation(); 291 Assertions.fail("IOException is expected"); 292 } catch (IOException ee) { 293 // Expected 294 } 295 } else { 296 // Repair is required 297 assertTrue(table.isMergeInProgress()); 298 try { 299 table.startBackupExclusiveOperation(); 300 Assertions.fail("IOException is expected"); 301 } catch (IOException ee) { 302 // Expected - clean up before proceeding 303 // table.finishMergeOperation(); 304 // table.finishBackupExclusiveOperation(); 305 } 306 } 307 table.close(); 308 LOG.debug("Expected :" + e.getMessage()); 309 } 310 } 311 // Now merge w/o failures 312 Configuration conf = conn.getConfiguration(); 313 conf.unset(FAILURE_PHASE_KEY); 314 conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS); 315 // Now run repair 316 BackupSystemTable sysTable = new BackupSystemTable(conn); 317 BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable); 318 // Now repeat merge 319 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) { 320 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; 321 bAdmin.mergeBackups(backups); 322 } 323 324 // #6 - restore incremental backup for multiple tables, with overwrite 325 TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 }; 326 TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore }; 327 client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false, 328 tablesRestoreIncMultiple, tablesMapIncMultiple, true)); 329 330 Table hTable = conn.getTable(table1_restore); 331 LOG.debug("After incremental restore: " + hTable.getDescriptor()); 332 LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); 333 Assertions.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable, famName)); 334 335 hTable.close(); 336 337 hTable = conn.getTable(table2_restore); 338 Assertions.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable)); 339 hTable.close(); 340 341 admin.close(); 342 conn.close(); 343 } 344}