001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
034import org.apache.hadoop.hbase.backup.impl.BackupCommands;
035import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
036import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
037import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
038import org.apache.hadoop.hbase.backup.util.BackupUtils;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Pair;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
053
054@Category(LargeTests.class)
055public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestIncrementalBackupMergeWithFailures.class);
060
061  private static final Logger LOG =
062      LoggerFactory.getLogger(TestIncrementalBackupMergeWithFailures.class);
063
064  enum FailurePhase {
065    PHASE1, PHASE2, PHASE3, PHASE4
066  }
067
068  public final static String FAILURE_PHASE_KEY = "failurePhase";
069
070  static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob {
071    FailurePhase failurePhase;
072
073    @Override
074    public void setConf(Configuration conf) {
075      super.setConf(conf);
076      String val = conf.get(FAILURE_PHASE_KEY);
077      if (val != null) {
078        failurePhase = FailurePhase.valueOf(val);
079      } else {
080        Assert.fail("Failure phase is not set");
081      }
082    }
083
084    /**
085     * This is the exact copy of parent's run() with injections
086     * of different types of failures
087     */
088    @Override
089    public void run(String[] backupIds) throws IOException {
090      String bulkOutputConfKey;
091
092      // TODO : run player on remote cluster
093      player = new MapReduceHFileSplitterJob();
094      bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
095      // Player reads all files in arbitrary directory structure and creates
096      // a Map task for each file
097      String bids = StringUtils.join(backupIds, ",");
098
099      if (LOG.isDebugEnabled()) {
100        LOG.debug("Merge backup images " + bids);
101      }
102
103      List<Pair<TableName, Path>> processedTableList = new ArrayList<>();
104      boolean finishedTables = false;
105      Connection conn = ConnectionFactory.createConnection(getConf());
106      BackupSystemTable table = new BackupSystemTable(conn);
107      FileSystem fs = FileSystem.get(getConf());
108
109      try {
110        // Start backup exclusive operation
111        table.startBackupExclusiveOperation();
112        // Start merge operation
113        table.startMergeOperation(backupIds);
114
115        // Select most recent backup id
116        String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
117
118        TableName[] tableNames = getTableNamesInBackupImages(backupIds);
119
120        BackupInfo bInfo = table.readBackupInfo(backupIds[0]);
121        String backupRoot = bInfo.getBackupRootDir();
122        // PHASE 1
123        checkFailure(FailurePhase.PHASE1);
124
125        for (int i = 0; i < tableNames.length; i++) {
126          LOG.info("Merge backup images for " + tableNames[i]);
127
128          // Find input directories for table
129          Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
130          String dirs = StringUtils.join(dirPaths, ",");
131          Path bulkOutputPath =
132              BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
133                getConf(), false);
134          // Delete content if exists
135          if (fs.exists(bulkOutputPath)) {
136            if (!fs.delete(bulkOutputPath, true)) {
137              LOG.warn("Can not delete: " + bulkOutputPath);
138            }
139          }
140          Configuration conf = getConf();
141          conf.set(bulkOutputConfKey, bulkOutputPath.toString());
142          String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
143
144          // PHASE 2
145          checkFailure(FailurePhase.PHASE2);
146          player.setConf(getConf());
147          int result = player.run(playerArgs);
148          if (succeeded(result)) {
149            // Add to processed table list
150            processedTableList.add(new Pair<>(tableNames[i], bulkOutputPath));
151          } else {
152            throw new IOException("Can not merge backup images for " + dirs
153                + " (check Hadoop/MR and HBase logs). Player return code =" + result);
154          }
155          LOG.debug("Merge Job finished:" + result);
156        }
157        List<TableName> tableList = toTableNameList(processedTableList);
158        // PHASE 3
159        checkFailure(FailurePhase.PHASE3);
160        table.updateProcessedTablesForMerge(tableList);
161        finishedTables = true;
162
163        // (modification of a backup file system)
164        // Move existing mergedBackupId data into tmp directory
165        // we will need it later in case of a failure
166        Path tmpBackupDir =  HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
167          mergedBackupId);
168        Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
169        if (!fs.rename(backupDirPath, tmpBackupDir)) {
170          throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
171        } else {
172          LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
173        }
174        // Move new data into backup dest
175        for (Pair<TableName, Path> tn : processedTableList) {
176          moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
177        }
178        checkFailure(FailurePhase.PHASE4);
179        // Update backup manifest
180        List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
181        updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
182        // Copy meta files back from tmp to backup dir
183        copyMetaData(fs, tmpBackupDir, backupDirPath);
184        // Delete tmp dir (Rename back during repair)
185        if (!fs.delete(tmpBackupDir, true)) {
186          // WARN and ignore
187          LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
188        }
189        // Delete old data
190        deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
191        // Finish merge session
192        table.finishMergeOperation();
193        // Release lock
194        table.finishBackupExclusiveOperation();
195      } catch (RuntimeException e) {
196        throw e;
197      } catch (Exception e) {
198        LOG.error(e.toString(), e);
199        if (!finishedTables) {
200          // cleanup bulk directories and finish merge
201          // merge MUST be repeated (no need for repair)
202          cleanupBulkLoadDirs(fs, toPathList(processedTableList));
203          table.finishMergeOperation();
204          table.finishBackupExclusiveOperation();
205          throw new IOException("Backup merge operation failed, you should try it again", e);
206        } else {
207          // backup repair must be run
208          throw new IOException(
209              "Backup merge operation failed, run backup repair tool to restore system's integrity",
210              e);
211        }
212      } finally {
213        table.close();
214        conn.close();
215      }
216    }
217
218    private void checkFailure(FailurePhase phase) throws IOException {
219      if (failurePhase != null && failurePhase == phase) {
220        throw new IOException(phase.toString());
221      }
222    }
223  }
224
225  @Test
226  public void TestIncBackupMergeRestore() throws Exception {
227    int ADD_ROWS = 99;
228    // #1 - create full backup for all tables
229    LOG.info("create full backup image for all tables");
230
231    List<TableName> tables = Lists.newArrayList(table1, table2);
232    // Set custom Merge Job implementation
233    conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS,
234      BackupMergeJobWithFailures.class, BackupMergeJob.class);
235
236    Connection conn = ConnectionFactory.createConnection(conf1);
237
238    Admin admin = conn.getAdmin();
239    BackupAdminImpl client = new BackupAdminImpl(conn);
240
241    BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
242    String backupIdFull = client.backupTables(request);
243
244    assertTrue(checkSucceeded(backupIdFull));
245
246    // #2 - insert some data to table1
247    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
248    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
249
250    Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
251    t1.close();
252    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
253
254    Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
255
256    Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
257    t2.close();
258    LOG.debug("written " + ADD_ROWS + " rows to " + table2);
259
260    // #3 - incremental backup for multiple tables
261    tables = Lists.newArrayList(table1, table2);
262    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
263    String backupIdIncMultiple = client.backupTables(request);
264
265    assertTrue(checkSucceeded(backupIdIncMultiple));
266
267    t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
268    t1.close();
269
270    t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
271    t2.close();
272
273    // #3 - incremental backup for multiple tables
274    request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
275    String backupIdIncMultiple2 = client.backupTables(request);
276    assertTrue(checkSucceeded(backupIdIncMultiple2));
277        // #4 Merge backup images with failures
278
279    for (FailurePhase phase : FailurePhase.values()) {
280      Configuration conf = conn.getConfiguration();
281
282      conf.set(FAILURE_PHASE_KEY, phase.toString());
283
284      try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
285        String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
286        bAdmin.mergeBackups(backups);
287        Assert.fail("Expected IOException");
288      } catch (IOException e) {
289        BackupSystemTable table = new BackupSystemTable(conn);
290        if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) {
291          // No need to repair:
292          // Both Merge and backup exclusive operations are finished
293          assertFalse(table.isMergeInProgress());
294          try {
295            table.finishBackupExclusiveOperation();
296            Assert.fail("IOException is expected");
297          } catch(IOException ee) {
298            // Expected
299          }
300        } else {
301          // Repair is required
302          assertTrue(table.isMergeInProgress());
303          try {
304            table.startBackupExclusiveOperation();
305            Assert.fail("IOException is expected");
306          } catch(IOException ee) {
307            // Expected - clean up before proceeding
308            //table.finishMergeOperation();
309            //table.finishBackupExclusiveOperation();
310          }
311        }
312        table.close();
313        LOG.debug("Expected :"+ e.getMessage());
314      }
315    }
316    // Now merge w/o failures
317    Configuration conf = conn.getConfiguration();
318    conf.unset(FAILURE_PHASE_KEY);
319    conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
320    // Now run repair
321    BackupSystemTable sysTable = new BackupSystemTable(conn);
322    BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
323    // Now repeat merge
324    try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
325      String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
326      bAdmin.mergeBackups(backups);
327    }
328
329    // #6 - restore incremental backup for multiple tables, with overwrite
330    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
331    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
332    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
333      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
334
335    Table hTable = conn.getTable(table1_restore);
336    LOG.debug("After incremental restore: " + hTable.getDescriptor());
337    LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
338    Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
339
340    hTable.close();
341
342    hTable = conn.getTable(table2_restore);
343    Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
344    hTable.close();
345
346    admin.close();
347    conn.close();
348  }
349}