001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
021
022import java.io.IOException;
023import org.apache.commons.lang3.StringUtils;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
028import org.apache.hadoop.hbase.backup.RestoreJob;
029import org.apache.hadoop.hbase.backup.util.BackupUtils;
030import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
031import org.apache.hadoop.util.Tool;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * MapReduce implementation of {@link RestoreJob} For backup restore, it runs
038 * {@link MapReduceHFileSplitterJob} job and creates HFiles which are aligned with a region
039 * boundaries of a table being restored. The resulting HFiles then are loaded using HBase bulk load
040 * tool {@link BulkLoadHFiles}.
041 */
042@InterfaceAudience.Private
043public class MapReduceRestoreJob implements RestoreJob {
044  public static final Logger LOG = LoggerFactory.getLogger(MapReduceRestoreJob.class);
045
046  private Tool player;
047  private Configuration conf;
048
049  public MapReduceRestoreJob() {
050  }
051
052  @Override
053  public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
054    boolean fullBackupRestore) throws IOException {
055    String bulkOutputConfKey;
056
057    player = new MapReduceHFileSplitterJob();
058    bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY;
059    // Player reads all files in arbitrary directory structure and creates
060    // a Map task for each file
061    String dirs = StringUtils.join(dirPaths, ",");
062
063    if (LOG.isDebugEnabled()) {
064      LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
065        + " backup from directory " + dirs + " from hbase tables "
066        + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
067        + " to tables "
068        + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
069    }
070
071    for (int i = 0; i < tableNames.length; i++) {
072      LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
073
074      Path bulkOutputPath = BackupUtils
075        .getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
076      Configuration conf = getConf();
077      conf.set(bulkOutputConfKey, bulkOutputPath.toString());
078      String[] playerArgs = { dirs,
079        fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i].getNameAsString() };
080
081      int result;
082      try {
083
084        player.setConf(getConf());
085        result = player.run(playerArgs);
086        if (succeeded(result)) {
087          // do bulk load
088          BulkLoadHFiles loader = BackupUtils.createLoader(getConf());
089          if (LOG.isDebugEnabled()) {
090            LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
091          }
092
093          if (loader.bulkLoad(newTableNames[i], bulkOutputPath).isEmpty()) {
094            throw new IOException("Can not restore from backup directory " + dirs
095              + " (check Hadoop and HBase logs). Bulk loader returns null");
096          }
097        } else {
098          throw new IOException("Can not restore from backup directory " + dirs
099            + " (check Hadoop/MR and HBase logs). Player return code =" + result);
100        }
101        LOG.debug("Restore Job finished:" + result);
102      } catch (Exception e) {
103        LOG.error(e.toString(), e);
104        throw new IOException(
105          "Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e);
106      }
107    }
108  }
109
110  @Override
111  public Configuration getConf() {
112    return conf;
113  }
114
115  @Override
116  public void setConf(Configuration conf) {
117    this.conf = conf;
118  }
119}