001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.commons.lang3.StringUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; 035import org.apache.hadoop.hbase.backup.impl.BackupCommands; 036import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 037import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; 038import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; 039import org.apache.hadoop.hbase.backup.util.BackupUtils; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Pair; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 052 053@Tag(LargeTests.TAG) 054public class TestIncrementalBackupMergeWithFailures extends TestBackupBase { 055 056 private static final Logger LOG = 057 LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class); 058 059 enum FailurePhase { 060 PHASE1, 061 PHASE2, 062 PHASE3, 063 PHASE4 064 } 065 066 public final static String FAILURE_PHASE_KEY = "failurePhase"; 067 068 static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob { 069 FailurePhase failurePhase; 070 071 @Override 072 public void setConf(Configuration conf) { 073 super.setConf(conf); 074 String val = conf.get(FAILURE_PHASE_KEY); 075 if (val != null) { 076 failurePhase = FailurePhase.valueOf(val); 077 } else { 078 fail("Failure phase is not set"); 079 } 080 } 081 082 /** 083 * This is the exact copy of parent's run() with injections of different types of failures 084 */ 085 @Override 086 public void run(String[] backupIds) throws IOException { 087 String bulkOutputConfKey; 088 089 // TODO : run player on remote cluster 090 player = new MapReduceHFileSplitterJob(); 091 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 092 // Player reads all files in arbitrary directory structure and creates 093 // a Map task for each file 094 String bids = StringUtils.join(backupIds, ","); 095 096 if (LOG.isDebugEnabled()) { 097 LOG.debug("Merge backup images " + bids); 098 } 099 100 List<Pair<TableName, Path>> processedTableList = new ArrayList<>(); 101 boolean finishedTables = false; 102 Connection conn = ConnectionFactory.createConnection(getConf()); 103 BackupSystemTable table = new BackupSystemTable(conn); 104 FileSystem fs = FileSystem.get(getConf()); 105 106 try { 107 // Start backup exclusive operation 108 table.startBackupExclusiveOperation(); 109 // Start merge operation 110 table.startMergeOperation(backupIds); 111 112 // Select most recent backup id 113 String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds); 114 115 TableName[] tableNames = getTableNamesInBackupImages(backupIds); 116 117 BackupInfo bInfo = table.readBackupInfo(backupIds[0]); 118 String backupRoot = bInfo.getBackupRootDir(); 119 // PHASE 1 120 checkFailure(FailurePhase.PHASE1); 121 122 for (int i = 0; i < tableNames.length; i++) { 123 LOG.info("Merge backup images for " + tableNames[i]); 124 125 // Find input directories for table 126 Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); 127 String dirs = StringUtils.join(dirPaths, ","); 128 Path bulkOutputPath = BackupUtils.getBulkOutputDir( 129 BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false); 130 // Delete content if exists 131 if (fs.exists(bulkOutputPath)) { 132 if (!fs.delete(bulkOutputPath, true)) { 133 LOG.warn("Can not delete: " + bulkOutputPath); 134 } 135 } 136 Configuration conf = getConf(); 137 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 138 String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; 139 140 // PHASE 2 141 checkFailure(FailurePhase.PHASE2); 142 player.setConf(getConf()); 143 int result = player.run(playerArgs); 144 if (succeeded(result)) { 145 // Add to processed table list 146 processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath)); 147 } else { 148 throw new IOException("Can not merge backup images for " + dirs 149 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 150 } 151 LOG.debug("Merge Job finished:" + result); 152 } 153 List<TableName> tableList = toTableNameList(processedTableList); 154 // PHASE 3 155 checkFailure(FailurePhase.PHASE3); 156 table.updateProcessedTablesForMerge(tableList); 157 finishedTables = true; 158 159 // (modification of a backup file system) 160 // Move existing mergedBackupId data into tmp directory 161 // we will need it later in case of a failure 162 Path tmpBackupDir = 163 HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId); 164 Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId); 165 if (!fs.rename(backupDirPath, tmpBackupDir)) { 166 throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir); 167 } else { 168 LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir); 169 } 170 // Move new data into backup dest 171 for (Pair<TableName, Path> tn : processedTableList) { 172 moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); 173 } 174 checkFailure(FailurePhase.PHASE4); 175 // Update backup manifest 176 List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); 177 updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete); 178 // Copy meta files back from tmp to backup dir 179 copyMetaData(fs, tmpBackupDir, backupDirPath); 180 // Delete tmp dir (Rename back during repair) 181 if (!fs.delete(tmpBackupDir, true)) { 182 // WARN and ignore 183 LOG.warn("Could not delete tmp dir: " + tmpBackupDir); 184 } 185 // Delete old data 186 deleteBackupImages(backupsToDelete, conn, fs, backupRoot); 187 // Finish merge session 188 table.finishMergeOperation(); 189 // Release lock 190 table.finishBackupExclusiveOperation(); 191 } catch (RuntimeException e) { 192 throw e; 193 } catch (Exception e) { 194 LOG.error(e.toString(), e); 195 if (!finishedTables) { 196 // cleanup bulk directories and finish merge 197 // merge MUST be repeated (no need for repair) 198 cleanupBulkLoadDirs(fs, toPathList(processedTableList)); 199 table.finishMergeOperation(); 200 table.finishBackupExclusiveOperation(); 201 throw new IOException("Backup merge operation failed, you should try it again", e); 202 } else { 203 // backup repair must be run 204 throw new IOException( 205 "Backup merge operation failed, run backup repair tool to restore system's integrity", 206 e); 207 } 208 } finally { 209 table.close(); 210 conn.close(); 211 } 212 } 213 214 private void checkFailure(FailurePhase phase) throws IOException { 215 if (failurePhase != null && failurePhase == phase) { 216 throw new IOException(phase.toString()); 217 } 218 } 219 } 220 221 @Test 222 public void TestIncBackupMergeRestore() throws Exception { 223 int ADD_ROWS = 99; 224 // #1 - create full backup for all tables 225 LOG.info("create full backup image for all tables"); 226 227 List<TableName> tables = Lists.newArrayList(table1, table2); 228 // Set custom Merge Job implementation 229 conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS, 230 BackupMergeJobWithFailures.class, BackupMergeJob.class); 231 232 Connection conn = ConnectionFactory.createConnection(conf1); 233 234 Admin admin = conn.getAdmin(); 235 BackupAdminImpl client = new BackupAdminImpl(conn); 236 237 BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); 238 String backupIdFull = client.backupTables(request); 239 240 assertTrue(checkSucceeded(backupIdFull)); 241 242 // #2 - insert some data to table1 243 Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); 244 LOG.debug("writing " + ADD_ROWS + " rows to " + table1); 245 246 assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t1)); 247 t1.close(); 248 LOG.debug("written " + ADD_ROWS + " rows to " + table1); 249 250 Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); 251 252 assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t2)); 253 t2.close(); 254 LOG.debug("written " + ADD_ROWS + " rows to " + table2); 255 256 // #3 - incremental backup for multiple tables 257 tables = Lists.newArrayList(table1, table2); 258 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); 259 String backupIdIncMultiple = client.backupTables(request); 260 261 assertTrue(checkSucceeded(backupIdIncMultiple)); 262 263 t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS); 264 t1.close(); 265 266 t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS); 267 t2.close(); 268 269 // #3 - incremental backup for multiple tables 270 request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); 271 String backupIdIncMultiple2 = client.backupTables(request); 272 assertTrue(checkSucceeded(backupIdIncMultiple2)); 273 // #4 Merge backup images with failures 274 275 for (FailurePhase phase : FailurePhase.values()) { 276 Configuration conf = conn.getConfiguration(); 277 278 conf.set(FAILURE_PHASE_KEY, phase.toString()); 279 280 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) { 281 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; 282 bAdmin.mergeBackups(backups); 283 fail("Expected IOException"); 284 } catch (IOException e) { 285 BackupSystemTable table = new BackupSystemTable(conn); 286 if (phase.ordinal() < FailurePhase.PHASE4.ordinal()) { 287 // No need to repair: 288 // Both Merge and backup exclusive operations are finished 289 assertFalse(table.isMergeInProgress()); 290 try { 291 table.finishBackupExclusiveOperation(); 292 fail("IOException is expected"); 293 } catch (IOException ee) { 294 // Expected 295 } 296 } else { 297 // Repair is required 298 assertTrue(table.isMergeInProgress()); 299 try { 300 table.startBackupExclusiveOperation(); 301 fail("IOException is expected"); 302 } catch (IOException ee) { 303 // Expected - clean up before proceeding 304 // table.finishMergeOperation(); 305 // table.finishBackupExclusiveOperation(); 306 } 307 } 308 table.close(); 309 LOG.debug("Expected :" + e.getMessage()); 310 } 311 } 312 // Now merge w/o failures 313 Configuration conf = conn.getConfiguration(); 314 conf.unset(FAILURE_PHASE_KEY); 315 conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS); 316 // Now run repair 317 BackupSystemTable sysTable = new BackupSystemTable(conn); 318 BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable); 319 // Now repeat merge 320 try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) { 321 String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; 322 bAdmin.mergeBackups(backups); 323 } 324 325 // #6 - restore incremental backup for multiple tables, with overwrite 326 TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 }; 327 TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore }; 328 client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false, 329 tablesRestoreIncMultiple, tablesMapIncMultiple, true)); 330 331 Table hTable = conn.getTable(table1_restore); 332 LOG.debug("After incremental restore: " + hTable.getDescriptor()); 333 LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); 334 assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable, famName)); 335 336 hTable.close(); 337 338 hTable = conn.getTable(table2_restore); 339 assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable)); 340 hTable.close(); 341 342 admin.close(); 343 conn.close(); 344 } 345}