001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.Path;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.hadoop.hbase.client.Scan;
026import org.apache.hadoop.mapreduce.InputSplit;
027import org.apache.hadoop.mapreduce.JobContext;
028
029import java.io.IOException;
030import java.util.Collection;
031import java.util.List;
032import java.util.Map;
033
034/**
035 * MultiTableSnapshotInputFormat generalizes
036 * {@link TableSnapshotInputFormat}
037 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
038 * configured for each.
039 * Internally, the input format delegates to
040 * {@link TableSnapshotInputFormat}
041 * and thus has the same performance advantages;
042 * see {@link TableSnapshotInputFormat} for
043 * more details.
044 * Usage is similar to TableSnapshotInputFormat, with the following exception:
045 * initMultiTableSnapshotMapperJob takes in a map
046 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
047 * scan will be applied;
048 * the overall dataset for the job is defined by the concatenation of the regions and tables
049 * included in each snapshot/scan
050 * pair.
051 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob
052 * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
053 * .hadoop.fs.Path)}
054 * can be used to configure the job.
055 * <pre>{@code
056 * Job job = new Job(conf);
057 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
058 *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
059 *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
060 * );
061 * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
062 * TableMapReduceUtil.initTableSnapshotMapperJob(
063 *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
064 *      MyMapOutputValueWritable.class, job, true, restoreDir);
065 * }
066 * </pre>
067 * Internally, this input format restores each snapshot into a subdirectory of the given tmp
068 * directory. Input splits and
069 * record readers are created as described in
070 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
071 * (one per region).
072 * See {@link TableSnapshotInputFormat} for more notes on
073 * permissioning; the same caveats apply here.
074 *
075 * @see TableSnapshotInputFormat
076 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
077 */
078@InterfaceAudience.Public
079public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
080
081  private final MultiTableSnapshotInputFormatImpl delegate;
082
083  public MultiTableSnapshotInputFormat() {
084    this.delegate = new MultiTableSnapshotInputFormatImpl();
085  }
086
087  @Override
088  public List<InputSplit> getSplits(JobContext jobContext)
089      throws IOException, InterruptedException {
090    List<TableSnapshotInputFormatImpl.InputSplit> splits =
091        delegate.getSplits(jobContext.getConfiguration());
092    List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
093
094    for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
095      rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
096    }
097
098    return rtn;
099  }
100
101  public static void setInput(Configuration configuration,
102      Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
103    new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
104  }
105}