001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.Path;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.hadoop.hbase.client.Scan;
026import org.apache.hadoop.mapreduce.InputSplit;
027import org.apache.hadoop.mapreduce.JobContext;
028
029import java.io.IOException;
030import java.util.Collection;
031import java.util.List;
032import java.util.Map;
033
034/**
035 * MultiTableSnapshotInputFormat generalizes
036 * {@link TableSnapshotInputFormat}
037 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
038 * configured for each.
039 * Internally, the input format delegates to
040 * {@link TableSnapshotInputFormat}
041 * and thus has the same performance advantages;
042 * see {@link TableSnapshotInputFormat} for
043 * more details.
044 * Usage is similar to TableSnapshotInputFormat, with the following exception:
045 * initMultiTableSnapshotMapperJob takes in a map
046 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
047 * scan will be applied;
048 * the overall dataset for the job is defined by the concatenation of the regions and tables
049 * included in each snapshot/scan
050 * pair.
051 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob
052 * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
053 * .hadoop.fs.Path)}
054 * can be used to configure the job.
055 * <pre>{@code
056 * Job job = new Job(conf);
057 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
058 *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
059 *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
060 * );
061 * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
062 * TableMapReduceUtil.initTableSnapshotMapperJob(
063 *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
064 *      MyMapOutputValueWritable.class, job, true, restoreDir);
065 * }
066 * </pre>
067 * Internally, this input format restores each snapshot into a subdirectory of the given tmp
068 * directory. Input splits and
069 * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
070 * .TableSnapshotInputFormat}
071 * (one per region).
072 * See {@link TableSnapshotInputFormat} for more notes on
073 * permissioning; the
074 * same caveats apply here.
075 *
076 * @see TableSnapshotInputFormat
077 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
078 */
079@InterfaceAudience.Public
080public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
081
082  private final MultiTableSnapshotInputFormatImpl delegate;
083
084  public MultiTableSnapshotInputFormat() {
085    this.delegate = new MultiTableSnapshotInputFormatImpl();
086  }
087
088  @Override
089  public List<InputSplit> getSplits(JobContext jobContext)
090      throws IOException, InterruptedException {
091    List<TableSnapshotInputFormatImpl.InputSplit> splits =
092        delegate.getSplits(jobContext.getConfiguration());
093    List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
094
095    for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
096      rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
097    }
098
099    return rtn;
100  }
101
102  public static void setInput(Configuration configuration,
103      Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
104    new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
105  }
106}