001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021
022import java.io.IOException;
023import org.apache.commons.lang3.StringUtils;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
028import org.apache.hadoop.hbase.backup.RestoreJob;
029import org.apache.hadoop.hbase.backup.util.BackupUtils;
030import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
031import org.apache.hadoop.util.Tool;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * MapReduce implementation of {@link RestoreJob} For backup restore, it runs
038 * {@link MapReduceHFileSplitterJob} job and creates HFiles which are aligned with a region
039 * boundaries of a table being restored. The resulting HFiles then are loaded using HBase bulk load
040 * tool {@link BulkLoadHFiles}.
041 */
042@InterfaceAudience.Private
043public class MapReduceRestoreJob implements RestoreJob {
044  public static final Logger LOG = LoggerFactory.getLogger(MapReduceRestoreJob.class);
045
046  private Tool player;
047  private Configuration conf;
048
049  public MapReduceRestoreJob() {
050  }
051
052  @Override
053  public void run(Path[] dirPaths, TableName[] tableNames, Path restoreRootDir,
054    TableName[] newTableNames, boolean fullBackupRestore) throws IOException {
055    String bulkOutputConfKey;
056
057    player = new MapReduceHFileSplitterJob();
058    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
059    // Player reads all files in arbitrary directory structure and creates
060    // a Map task for each file
061    String dirs = StringUtils.join(dirPaths, ",");
062
063    if (LOG.isDebugEnabled()) {
064      LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
065        + " backup from directory " + dirs + " from hbase tables "
066        + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
067        + " to tables "
068        + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
069    }
070
071    for (int i = 0; i < tableNames.length; i++) {
072      LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
073      Path bulkOutputPath = BackupUtils.getBulkOutputDir(restoreRootDir,
074        BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
075      Configuration conf = getConf();
076      conf.set(bulkOutputConfKey, bulkOutputPath.toString());
077      String[] playerArgs = { dirs, newTableNames[i].getNameAsString() };
078      int result;
079      try {
080
081        player.setConf(getConf());
082        result = player.run(playerArgs);
083        if (succeeded(result)) {
084          // do bulk load
085          BulkLoadHFiles loader = BackupUtils.createLoader(getConf());
086          if (LOG.isDebugEnabled()) {
087            LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
088          }
089
090          loader.bulkLoad(newTableNames[i], bulkOutputPath);
091        } else {
092          throw new IOException("Can not restore from backup directory " + dirs
093            + " (check Hadoop/MR and HBase logs). Player return code =" + result);
094        }
095        LOG.debug("Restore Job finished:" + result);
096      } catch (Exception e) {
097        LOG.error(e.toString(), e);
098        throw new IOException(
099          "Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e);
100      }
101    }
102  }
103
104  @Override
105  public Configuration getConf() {
106    return conf;
107  }
108
109  @Override
110  public void setConf(Configuration conf) {
111    this.conf = conf;
112  }
113}