001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
033import org.apache.hadoop.hbase.backup.impl.BackupCommands;
034import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
035import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
036import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
037import org.apache.hadoop.hbase.backup.util.BackupUtils;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.util.Pair;
044import org.junit.jupiter.api.Assertions;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052@Tag(LargeTests.TAG)
053public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
054
055  private static final Logger LOG =
056    LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class);
057
058  enum FailurePhase {
059    PHASE1,
060    PHASE2,
061    PHASE3,
062    PHASE4
063  }
064
065  public final static String FAILURE_PHASE_KEY = "failurePhase";
066
067  static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
068    FailurePhase failurePhase;
069
070    @Override
071    public void setConf(Configuration conf) {
072      super.setConf(conf);
073      String val = conf.get(FAILURE_PHASE_KEY);
074      if (val != null) {
075        failurePhase = FailurePhase.valueOf(val);
076      } else {
077        Assertions.fail("Failure phase is not set");
078      }
079    }
080
081    /**
082     * This is the exact copy of parent's run() with injections of different types of failures
083     */
084    @Override
085    public void run(String[] backupIds) throws IOException {
086      String bulkOutputConfKey;
087
088      // TODO : run player on remote cluster
089      player = new MapReduceHFileSplitterJob();
090      bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
091      // Player reads all files in arbitrary directory structure and creates
092      // a Map task for each file
093      String bids = StringUtils.join(backupIds, ",");
094
095      if (LOG.isDebugEnabled()) {
096        LOG.debug("Merge backup images " + bids);
097      }
098
099      List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
100      boolean finishedTables = false;
101      Connection conn = ConnectionFactory.createConnection(getConf());
102      BackupSystemTable table = new BackupSystemTable(conn);
103      FileSystem fs = FileSystem.get(getConf());
104
105      try {
106        // Start backup exclusive operation
107        table.startBackupExclusiveOperation();
108        // Start merge operation
109        table.startMergeOperation(backupIds);
110
111        // Select most recent backup id
112        String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
113
114        TableName[] tableNames = getTableNamesInBackupImages(backupIds);
115
116        BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
117        String backupRoot = bInfo.getBackupRootDir();
118        // PHASE 1
119        checkFailure(FailurePhase.PHASE1);
120
121        for (int i = 0; i < tableNames.length; i++) {
122          LOG.info("Merge backup images for " + tableNames[i]);
123
124          // Find input directories for table
125          Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
126          String dirs = StringUtils.join(dirPaths, ",");
127          Path bulkOutputPath = BackupUtils.getBulkOutputDir(
128            BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
129          // Delete content if exists
130          if (fs.exists(bulkOutputPath)) {
131            if (!fs.delete(bulkOutputPath, true)) {
132              LOG.warn("Can not delete: " + bulkOutputPath);
133            }
134          }
135          Configuration conf = getConf();
136          conf.set(bulkOutputConfKey, bulkOutputPath.toString());
137          String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
138
139          // PHASE 2
140          checkFailure(FailurePhase.PHASE2);
141          player.setConf(getConf());
142          int result = player.run(playerArgs);
143          if (succeeded(result)) {
144            // Add to processed table list
145            processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
146          } else {
147            throw new IOException("Can not merge backup images for " + dirs
148              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
149          }
150          LOG.debug("Merge Job finished:" + result);
151        }
152        List<TableName> tableList = toTableNameList(processedTableList);
153        // PHASE 3
154        checkFailure(FailurePhase.PHASE3);
155        table.updateProcessedTablesForMerge(tableList);
156        finishedTables = true;
157
158        // (modification of a backup file system)
159        // Move existing mergedBackupId data into tmp directory
160        // we will need it later in case of a failure
161        Path tmpBackupDir =
162          HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, mergedBackupId);
163        Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
164        if (!fs.rename(backupDirPath, tmpBackupDir)) {
165          throw new IOException("Failed to rename " + backupDirPath + " to " + tmpBackupDir);
166        } else {
167          LOG.debug("Renamed " + backupDirPath + " to " + tmpBackupDir);
168        }
169        // Move new data into backup dest
170        for (Pair<TableName, Path> tn : processedTableList) {
171          moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
172        }
173        checkFailure(FailurePhase.PHASE4);
174        // Update backup manifest
175        List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
176        updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
177        // Copy meta files back from tmp to backup dir
178        copyMetaData(fs, tmpBackupDir, backupDirPath);
179        // Delete tmp dir (Rename back during repair)
180        if (!fs.delete(tmpBackupDir, true)) {
181          // WARN and ignore
182          LOG.warn("Could not delete tmp dir: " + tmpBackupDir);
183        }
184        // Delete old data
185        deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
186        // Finish merge session
187        table.finishMergeOperation();
188        // Release lock
189        table.finishBackupExclusiveOperation();
190      } catch (RuntimeException e) {
191        throw e;
192      } catch (Exception e) {
193        LOG.error(e.toString(), e);
194        if (!finishedTables) {
195          // cleanup bulk directories and finish merge
196          // merge MUST be repeated (no need for repair)
197          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
198          table.finishMergeOperation();
199          table.finishBackupExclusiveOperation();
200          throw new IOException("Backup merge operation failed, you should try it again", e);
201        } else {
202          // backup repair must be run
203          throw new IOException(
204            "Backup merge operation failed, run backup repair tool to restore system's integrity",
205            e);
206        }
207      } finally {
208        table.close();
209        conn.close();
210      }
211    }
212
213    private void checkFailure(FailurePhase phase) throws IOException {
214      if (failurePhase != null && failurePhase == phase) {
215        throw new IOException(phase.toString());
216      }
217    }
218  }
219
220  @Test
221  public void TestIncBackupMergeRestore() throws Exception {
222    int ADD_ROWS = 99;
223    // #1 - create full backup for all tables
224    LOG.info("create full backup image for all tables");
225
226    List<TableName> tables = Lists.newArrayList(table1, table2);
227    // Set custom Merge Job implementation
228    conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
229      BackupMergeJobWithFailures.class, BackupMergeJob.class);
230
231    Connection conn = ConnectionFactory.createConnection(conf1);
232
233    Admin admin = conn.getAdmin();
234    BackupAdminImpl client = new BackupAdminImpl(conn);
235
236    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
237    String backupIdFull = client.backupTables(request);
238
239    assertTrue(checkSucceeded(backupIdFull));
240
241    // #2 - insert some data to table1
242    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
243    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
244
245    Assertions.assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t1));
246    t1.close();
247    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
248
249    Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
250
251    Assertions.assertEquals(NB_ROWS_IN_BATCH + ADD_ROWS, TEST_UTIL.countRows(t2));
252    t2.close();
253    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
254
255    // #3 - incremental backup for multiple tables
256    tables = Lists.newArrayList(table1, table2);
257    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
258    String backupIdIncMultiple = client.backupTables(request);
259
260    assertTrue(checkSucceeded(backupIdIncMultiple));
261
262    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
263    t1.close();
264
265    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
266    t2.close();
267
268    // #3 - incremental backup for multiple tables
269    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
270    String backupIdIncMultiple2 = client.backupTables(request);
271    assertTrue(checkSucceeded(backupIdIncMultiple2));
272    // #4 Merge backup images with failures
273
274    for (FailurePhase phase : FailurePhase.values()) {
275      Configuration conf = conn.getConfiguration();
276
277      conf.set(FAILURE_PHASE_KEY, phase.toString());
278
279      try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
280        String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
281        bAdmin.mergeBackups(backups);
282        Assertions.fail("Expected IOException");
283      } catch (IOException e) {
284        BackupSystemTable table = new BackupSystemTable(conn);
285        if (phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
286          // No need to repair:
287          // Both Merge and backup exclusive operations are finished
288          assertFalse(table.isMergeInProgress());
289          try {
290            table.finishBackupExclusiveOperation();
291            Assertions.fail("IOException is expected");
292          } catch (IOException ee) {
293            // Expected
294          }
295        } else {
296          // Repair is required
297          assertTrue(table.isMergeInProgress());
298          try {
299            table.startBackupExclusiveOperation();
300            Assertions.fail("IOException is expected");
301          } catch (IOException ee) {
302            // Expected - clean up before proceeding
303            // table.finishMergeOperation();
304            // table.finishBackupExclusiveOperation();
305          }
306        }
307        table.close();
308        LOG.debug("Expected :" + e.getMessage());
309      }
310    }
311    // Now merge w/o failures
312    Configuration conf = conn.getConfiguration();
313    conf.unset(FAILURE_PHASE_KEY);
314    conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
315    // Now run repair
316    BackupSystemTable sysTable = new BackupSystemTable(conn);
317    BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
318    // Now repeat merge
319    try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
320      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
321      bAdmin.mergeBackups(backups);
322    }
323
324    // #6 - restore incremental backup for multiple tables, with overwrite
325    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
326    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
327    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
328      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
329
330    Table hTable = conn.getTable(table1_restore);
331    LOG.debug("After incremental restore: " + hTable.getDescriptor());
332    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
333    Assertions.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable, famName));
334
335    hTable.close();
336
337    hTable = conn.getTable(table2_restore);
338    Assertions.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, TEST_UTIL.countRows(hTable));
339    hTable.close();
340
341    admin.close();
342    conn.close();
343  }
344}