001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.commons.lang3.StringUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
035import org.apache.hadoop.hbase.backup.impl.BackupCommands;
036import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
037import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
038import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
039import org.apache.hadoop.hbase.backup.util.BackupUtils;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Pair;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052
053@Tag(LargeTests.TAG)
054public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
055
056  private static final Logger LOG =
057    LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class);
058
059  enum FailurePhase {
060    PHASE1,
061    PHASE2,
062    PHASE3,
063    PHASE4
064  }
065
066  public final static String FAILURE_PHASE_KEY = "failurePhase";
067
068  static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
069    FailurePhase failurePhase;
070
071    @Override
072    public void setConf(Configuration conf) {
073      super.setConf(conf);
074      String val = conf.get(FAILURE_PHASE_KEY);
075      if (val != null) {
076        failurePhase = FailurePhase.valueOf(val);
077      } else {
078        fail("Failure phase is not set");
079      }
080    }
081
082    /**
083     * This is the exact copy of parent's run() with injections of different types of failures
084     */
085    @Override
086    public void run(String[] backupIds) throws IOException {
087      String bulkOutputConfKey;
088
089      // TODO : run player on remote cluster
090      player = new MapReduceHFileSplitterJob();
091      bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
092      // Player reads all files in arbitrary directory structure and creates
093      // a Map task for each file
094      String bids = StringUtils.join(backupIds, ",");
095
096      if (LOG.isDebugEnabled()) {
097        LOG.debug("Merge backup images " + bids);
098      }
099
100      List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
101      boolean finishedTables = false;
102      Connection conn = ConnectionFactory.createConnection(getConf());
103      BackupSystemTable table = new BackupSystemTable(conn);
104      FileSystem fs = FileSystem.get(getConf());
105
106      try {
107        // Start backup exclusive operation
108        table.startBackupExclusiveOperation();
109        // Start merge operation
110        table.startMergeOperation(backupIds);
111
112        // Select most recent backup id
113        String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
114
115        TableName[] tableNames = getTableNamesInBackupImages(backupIds);
116
117        BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
118        String backupRoot = bInfo.getBackupRootDir();
119        // PHASE 1
120        checkFailure(FailurePhase.PHASE1);
121
122        for (int i = 0; i < tableNames.length; i++) {
123          LOG.info("Merge backup images for " + tableNames[i]);
124
125          // Find input directories for table
126          Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
127          String dirs = StringUtils.join(dirPaths, ",");
128          Path bulkOutputPath = BackupUtils.getBulkOutputDir(
129            BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
130          // Delete content if exists
131          if (fs.exists(bulkOutputPath)) {
132            if (!fs.delete(bulkOutputPath, true)) {
133              LOG.warn("Can not delete: " + bulkOutputPath);
134            }
135          }
136          Configuration conf = getConf();
137          conf.set(bulkOutputConfKey, bulkOutputPath.toString());
138          String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
139
140          // PHASE 2
141          checkFailure(FailurePhase.PHASE2);
142          player.setConf(getConf());
143          int result = player.run(playerArgs);
144          if (succeeded(result)) {
145            // Add to processed table list
146            processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
147          } else {
148            throw new IOException("Can not merge backup images for " + dirs
149              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
150          }
151          LOG.debug("Merge Job finished:" + result);
152        }
153        List<TableName> tableList = toTableNameList(processedTableList);
154        // PHASE 3
155        checkFailure(FailurePhase.PHASE3);
156        table.updateProcessedTablesForMerge(tableList);
157        finishedTables = true;
158
159        // (modification of a backup file system)
160        // Move existing mergedBackupId data into tmp directory
161        // we will need it later in case of a failure
162        Path tmpBackupDir =
163          HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId);
164        Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
165        if (!fs.rename(backupDirPath, tmpBackupDir)) {
166          throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir);
167        } else {
168          LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir);
169        }
170        // Move new data into backup dest
171        for (Pair<TableName, Path> tn : processedTableList) {
172          moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
173        }
174        checkFailure(FailurePhase.PHASE4);
175        // Update backup manifest
176        List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
177        updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
178        // Copy meta files back from tmp to backup dir
179        copyMetaData(fs, tmpBackupDir, backupDirPath);
180        // Delete tmp dir (Rename back during repair)
181        if (!fs.delete(tmpBackupDir, true)) {
182          // WARN and ignore
183          LOG.warn("Could not delete tmp dir: " + tmpBackupDir);
184        }
185        // Delete old data
186        deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
187        // Finish merge session
188        table.finishMergeOperation();
189        // Release lock
190        table.finishBackupExclusiveOperation();
191      } catch (RuntimeException e) {
192        throw e;
193      } catch (Exception e) {
194        LOG.error(e.toString(), e);
195        if (!finishedTables) {
196          // cleanup bulk directories and finish merge
197          // merge MUST be repeated (no need for repair)
198          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
199          table.finishMergeOperation();
200          table.finishBackupExclusiveOperation();
201          throw new IOException("Backup merge operation failed, you should try it again", e);
202        } else {
203          // backup repair must be run
204          throw new IOException(
205            "Backup merge operation failed, run backup repair tool to restore system's integrity",
206            e);
207        }
208      } finally {
209        table.close();
210        conn.close();
211      }
212    }
213
214    private void checkFailure(FailurePhase phase) throws IOException {
215      if (failurePhase != null && failurePhase == phase) {
216        throw new IOException(phase.toString());
217      }
218    }
219  }
220
221  @Test
222  public void TestIncBackupMergeRestore() throws Exception {
223    int ADD_ROWS = 99;
224    // #1 - create full backup for all tables
225    LOG.info("create full backup image for all tables");
226
227    List<TableName> tables = Lists.newArrayList(table1, table2);
228    // Set custom Merge Job implementation
229    conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
230      BackupMergeJobWithFailures.class, BackupMergeJob.class);
231
232    Connection conn = ConnectionFactory.createConnection(conf1);
233
234    Admin admin = conn.getAdmin();
235    BackupAdminImpl client = new BackupAdminImpl(conn);
236
237    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
238    String backupIdFull = client.backupTables(request);
239
240    assertTrue(checkSucceeded(backupIdFull));
241
242    // #2 - insert some data to table1
243    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
244    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
245
246    assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t1));
247    t1.close();
248    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
249
250    Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
251
252    assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t2));
253    t2.close();
254    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
255
256    // #3 - incremental backup for multiple tables
257    tables = Lists.newArrayList(table1, table2);
258    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
259    String backupIdIncMultiple = client.backupTables(request);
260
261    assertTrue(checkSucceeded(backupIdIncMultiple));
262
263    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
264    t1.close();
265
266    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
267    t2.close();
268
269    // #3 - incremental backup for multiple tables
270    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
271    String backupIdIncMultiple2 = client.backupTables(request);
272    assertTrue(checkSucceeded(backupIdIncMultiple2));
273    // #4 Merge backup images with failures
274
275    for (FailurePhase phase : FailurePhase.values()) {
276      Configuration conf = conn.getConfiguration();
277
278      conf.set(FAILURE_PHASE_KEY, phase.toString());
279
280      try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
281        String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
282        bAdmin.mergeBackups(backups);
283        fail("Expected IOException");
284      } catch (IOException e) {
285        BackupSystemTable table = new BackupSystemTable(conn);
286        if (phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
287          // No need to repair:
288          // Both Merge and backup exclusive operations are finished
289          assertFalse(table.isMergeInProgress());
290          try {
291            table.finishBackupExclusiveOperation();
292            fail("IOException is expected");
293          } catch (IOException ee) {
294            // Expected
295          }
296        } else {
297          // Repair is required
298          assertTrue(table.isMergeInProgress());
299          try {
300            table.startBackupExclusiveOperation();
301            fail("IOException is expected");
302          } catch (IOException ee) {
303            // Expected - clean up before proceeding
304            // table.finishMergeOperation();
305            // table.finishBackupExclusiveOperation();
306          }
307        }
308        table.close();
309        LOG.debug("Expected :" + e.getMessage());
310      }
311    }
312    // Now merge w/o failures
313    Configuration conf = conn.getConfiguration();
314    conf.unset(FAILURE_PHASE_KEY);
315    conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
316    // Now run repair
317    BackupSystemTable sysTable = new BackupSystemTable(conn);
318    BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
319    // Now repeat merge
320    try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
321      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
322      bAdmin.mergeBackups(backups);
323    }
324
325    // #6 - restore incremental backup for multiple tables, with overwrite
326    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
327    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
328    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
329      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
330
331    Table hTable = conn.getTable(table1_restore);
332    LOG.debug("After incremental restore: " + hTable.getDescriptor());
333    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
334    assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable, famName));
335
336    hTable.close();
337
338    hTable = conn.getTable(table2_restore);
339    assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable));
340    hTable.close();
341
342    admin.close();
343    conn.close();
344  }
345}