001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.mapreduce; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.backup.RestoreJob; 029import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.hbase.util.FSVisitor; 032import org.apache.yetus.audience.InterfaceAudience; 033 034import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 035 036@InterfaceAudience.Private 037public class MapReduceRestoreToOriginalSplitsJob implements RestoreJob { 038 private Configuration conf; 039 040 @Override 041 public void run(Path[] dirPaths, TableName[] fromTables, Path restoreRootDir, 042 TableName[] toTables, boolean fullBackupRestore) throws IOException { 043 Configuration conf = getConf(); 044 045 // We are using the files from the snapshot. We should copy them rather than move them over 046 conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true); 047 048 Path backupRootDir = new Path(conf.get(RestoreJob.BACKUP_ROOT_PATH_KEY)); 049 050 FileSystem fs = backupRootDir.getFileSystem(conf); 051 Map<byte[], List<Path>> family2Files = buildFamily2Files(fs, dirPaths, fullBackupRestore); 052 053 BulkLoadHFiles bulkLoad = BulkLoadHFiles.create(conf); 054 for (int i = 0; i < fromTables.length; i++) { 055 bulkLoad.bulkLoad(toTables[i], family2Files); 056 } 057 } 058 059 @Override 060 public void setConf(Configuration configuration) { 061 this.conf = configuration; 062 } 063 064 @Override 065 public Configuration getConf() { 066 return conf; 067 } 068 069 private static Map<byte[], List<Path>> buildFamily2Files(FileSystem fs, Path[] dirs, 070 boolean isFullBackup) throws IOException { 071 if (isFullBackup) { 072 return buildFullBackupFamily2Files(fs, dirs); 073 } 074 075 Map<byte[], List<Path>> family2Files = new HashMap<>(); 076 077 for (Path dir : dirs) { 078 byte[] familyName = Bytes.toBytes(dir.getParent().getName()); 079 if (family2Files.containsKey(familyName)) { 080 family2Files.get(familyName).add(dir); 081 } else { 082 family2Files.put(familyName, Lists.newArrayList(dir)); 083 } 084 } 085 086 return family2Files; 087 } 088 089 private static Map<byte[], List<Path>> buildFullBackupFamily2Files(FileSystem fs, Path[] dirs) 090 throws IOException { 091 Map<byte[], List<Path>> family2Files = new HashMap<>(); 092 for (Path regionPath : dirs) { 093 FSVisitor.visitRegionStoreFiles(fs, regionPath, (region, family, name) -> { 094 Path path = new Path(regionPath, new Path(family, name)); 095 byte[] familyName = Bytes.toBytes(family); 096 if (family2Files.containsKey(familyName)) { 097 family2Files.get(familyName).add(path); 098 } else { 099 family2Files.put(familyName, Lists.newArrayList(path)); 100 } 101 }); 102 } 103 return family2Files; 104 } 105 106}