001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.backup.RestoreJob;
029import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.hbase.util.FSVisitor;
032import org.apache.yetus.audience.InterfaceAudience;
033
034import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
035
036@InterfaceAudience.Private
037public class MapReduceRestoreToOriginalSplitsJob implements RestoreJob {
038  private Configuration conf;
039
040  @Override
041  public void run(Path[] dirPaths, TableName[] fromTables, Path restoreRootDir,
042    TableName[] toTables, boolean fullBackupRestore) throws IOException {
043    Configuration conf = getConf();
044
045    // We are using the files from the snapshot. We should copy them rather than move them over
046    conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true);
047
048    Path backupRootDir = new Path(conf.get(RestoreJob.BACKUP_ROOT_PATH_KEY));
049
050    FileSystem fs = backupRootDir.getFileSystem(conf);
051    Map<byte[], List<Path>> family2Files = buildFamily2Files(fs, dirPaths, fullBackupRestore);
052
053    BulkLoadHFiles bulkLoad = BulkLoadHFiles.create(conf);
054    for (int i = 0; i < fromTables.length; i++) {
055      bulkLoad.bulkLoad(toTables[i], family2Files);
056    }
057  }
058
059  @Override
060  public void setConf(Configuration configuration) {
061    this.conf = configuration;
062  }
063
064  @Override
065  public Configuration getConf() {
066    return conf;
067  }
068
069  private static Map<byte[], List<Path>> buildFamily2Files(FileSystem fs, Path[] dirs,
070    boolean isFullBackup) throws IOException {
071    if (isFullBackup) {
072      return buildFullBackupFamily2Files(fs, dirs);
073    }
074
075    Map<byte[], List<Path>> family2Files = new HashMap<>();
076
077    for (Path dir : dirs) {
078      byte[] familyName = Bytes.toBytes(dir.getParent().getName());
079      if (family2Files.containsKey(familyName)) {
080        family2Files.get(familyName).add(dir);
081      } else {
082        family2Files.put(familyName, Lists.newArrayList(dir));
083      }
084    }
085
086    return family2Files;
087  }
088
089  private static Map<byte[], List<Path>> buildFullBackupFamily2Files(FileSystem fs, Path[] dirs)
090    throws IOException {
091    Map<byte[], List<Path>> family2Files = new HashMap<>();
092    for (Path regionPath : dirs) {
093      FSVisitor.visitRegionStoreFiles(fs, regionPath, (region, family, name) -> {
094        Path path = new Path(regionPath, new Path(family, name));
095        byte[] familyName = Bytes.toBytes(family);
096        if (family2Files.containsKey(familyName)) {
097          family2Files.get(familyName).add(path);
098        } else {
099          family2Files.put(familyName, Lists.newArrayList(path));
100        }
101      });
102    }
103    return family2Files;
104  }
105
106}