001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021 022import java.io.IOException; 023import org.apache.commons.lang3.StringUtils; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 028import org.apache.hadoop.hbase.backup.RestoreJob; 029import org.apache.hadoop.hbase.backup.util.BackupUtils; 030import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 031import org.apache.hadoop.util.Tool; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * MapReduce implementation of {@link RestoreJob} For backup restore, it runs 038 * {@link MapReduceHFileSplitterJob} job and creates HFiles which are aligned with a region 039 * boundaries of a table being restored. The resulting HFiles then are loaded using HBase bulk load 040 * tool {@link BulkLoadHFiles}. 041 */ 042@InterfaceAudience.Private 043public class MapReduceRestoreJob implements RestoreJob { 044 public static final Logger LOG = LoggerFactory.getLogger(MapReduceRestoreJob.class); 045 046 private Tool player; 047 private Configuration conf; 048 049 public MapReduceRestoreJob() { 050 } 051 052 @Override 053 public void run(Path[] dirPaths, TableName[] tableNames, Path restoreRootDir, 054 TableName[] newTableNames, boolean fullBackupRestore) throws IOException { 055 String bulkOutputConfKey; 056 057 player = new MapReduceHFileSplitterJob(); 058 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 059 // Player reads all files in arbitrary directory structure and creates 060 // a Map task for each file 061 String dirs = StringUtils.join(dirPaths, ","); 062 063 if (LOG.isDebugEnabled()) { 064 LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") 065 + " backup from directory " + dirs + " from hbase tables " 066 + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) 067 + " to tables " 068 + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)); 069 } 070 071 for (int i = 0; i < tableNames.length; i++) { 072 LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); 073 Path bulkOutputPath = BackupUtils.getBulkOutputDir(restoreRootDir, 074 BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf()); 075 Configuration conf = getConf(); 076 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 077 String[] playerArgs = { dirs, newTableNames[i].getNameAsString() }; 078 int result; 079 try { 080 081 player.setConf(getConf()); 082 result = player.run(playerArgs); 083 if (succeeded(result)) { 084 // do bulk load 085 BulkLoadHFiles loader = BackupUtils.createLoader(getConf()); 086 if (LOG.isDebugEnabled()) { 087 LOG.debug("Restoring HFiles from directory " + bulkOutputPath); 088 } 089 090 loader.bulkLoad(newTableNames[i], bulkOutputPath); 091 } else { 092 throw new IOException("Can not restore from backup directory " + dirs 093 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 094 } 095 LOG.debug("Restore Job finished:" + result); 096 } catch (Exception e) { 097 LOG.error(e.toString(), e); 098 throw new IOException( 099 "Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); 100 } 101 } 102 } 103 104 @Override 105 public Configuration getConf() { 106 return conf; 107 } 108 109 @Override 110 public void setConf(Configuration conf) { 111 this.conf = conf; 112 } 113}