001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; 021 022import java.io.IOException; 023import org.apache.commons.lang3.StringUtils; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.backup.BackupRestoreConstants; 028import org.apache.hadoop.hbase.backup.RestoreJob; 029import org.apache.hadoop.hbase.backup.util.BackupUtils; 030import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 031import org.apache.hadoop.util.Tool; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * MapReduce implementation of {@link RestoreJob} For backup restore, it runs 038 * {@link MapReduceHFileSplitterJob} job and creates HFiles which are aligned with a region 039 * boundaries of a table being restored. The resulting HFiles then are loaded using HBase bulk load 040 * tool {@link BulkLoadHFiles}. 041 */ 042@InterfaceAudience.Private 043public class MapReduceRestoreJob implements RestoreJob { 044 public static final Logger LOG = LoggerFactory.getLogger(MapReduceRestoreJob.class); 045 046 private Tool player; 047 private Configuration conf; 048 049 public MapReduceRestoreJob() { 050 } 051 052 @Override 053 public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames, 054 boolean fullBackupRestore) throws IOException { 055 String bulkOutputConfKey; 056 057 player = new MapReduceHFileSplitterJob(); 058 bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; 059 // Player reads all files in arbitrary directory structure and creates 060 // a Map task for each file 061 String dirs = StringUtils.join(dirPaths, ","); 062 063 if (LOG.isDebugEnabled()) { 064 LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") 065 + " backup from directory " + dirs + " from hbase tables " 066 + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) 067 + " to tables " 068 + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)); 069 } 070 071 for (int i = 0; i < tableNames.length; i++) { 072 LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); 073 074 Path bulkOutputPath = BackupUtils 075 .getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf()); 076 Configuration conf = getConf(); 077 conf.set(bulkOutputConfKey, bulkOutputPath.toString()); 078 String[] playerArgs = { dirs, 079 fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i].getNameAsString() }; 080 081 int result; 082 try { 083 084 player.setConf(getConf()); 085 result = player.run(playerArgs); 086 if (succeeded(result)) { 087 // do bulk load 088 BulkLoadHFiles loader = BackupUtils.createLoader(getConf()); 089 if (LOG.isDebugEnabled()) { 090 LOG.debug("Restoring HFiles from directory " + bulkOutputPath); 091 } 092 093 if (loader.bulkLoad(newTableNames[i], bulkOutputPath).isEmpty()) { 094 throw new IOException("Can not restore from backup directory " + dirs 095 + " (check Hadoop and HBase logs). Bulk loader returns null"); 096 } 097 } else { 098 throw new IOException("Can not restore from backup directory " + dirs 099 + " (check Hadoop/MR and HBase logs). Player return code =" + result); 100 } 101 LOG.debug("Restore Job finished:" + result); 102 } catch (Exception e) { 103 LOG.error(e.toString(), e); 104 throw new IOException( 105 "Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); 106 } 107 } 108 } 109 110 @Override 111 public Configuration getConf() { 112 return conf; 113 } 114 115 @Override 116 public void setConf(Configuration conf) { 117 this.conf = conf; 118 } 119}