001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapred;
020
021import edu.umd.cs.findbugs.annotations.SuppressWarnings;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.Path;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.client.Scan;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
029import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
030import org.apache.hadoop.mapred.InputFormat;
031import org.apache.hadoop.mapred.InputSplit;
032import org.apache.hadoop.mapred.JobConf;
033import org.apache.hadoop.mapred.RecordReader;
034import org.apache.hadoop.mapred.Reporter;
035
036import java.io.IOException;
037import java.util.Collection;
038import java.util.List;
039import java.util.Map;
040
041/**
042 * MultiTableSnapshotInputFormat generalizes
043 * {@link org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat}
044 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
045 * configured for each.
046 * Internally, the input format delegates to
047 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
048 * and thus has the same performance advantages; see
049 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
050 * for more details.
051 * Usage is similar to TableSnapshotInputFormat, with the following exception:
052 * initMultiTableSnapshotMapperJob takes in a map
053 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
054 * scan will be applied;
055 * the overall dataset for the job is defined by the concatenation of the regions and tables
056 * included in each snapshot/scan
057 * pair.
058 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
059 * Class, Class, Class, JobConf, boolean, Path)}
060 * can be used to configure the job.
061 * <pre>{@code
062 * Job job = new Job(conf);
063 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
064 *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
065 *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
066 * );
067 * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
068 * TableMapReduceUtil.initTableSnapshotMapperJob(
069 *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
070 *      MyMapOutputValueWritable.class, job, true, restoreDir);
071 * }
072 * </pre>
073 * Internally, this input format restores each snapshot into a subdirectory of the given tmp
074 * directory. Input splits and
075 * record readers are created as described in
076 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}
077 * (one per region).
078 * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
079 * permissioning; the
080 * same caveats apply here.
081 *
082 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
083 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
084 */
085@InterfaceAudience.Public
086public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
087    implements InputFormat<ImmutableBytesWritable, Result> {
088
089  private final MultiTableSnapshotInputFormatImpl delegate;
090
091  public MultiTableSnapshotInputFormat() {
092    this.delegate = new MultiTableSnapshotInputFormatImpl();
093  }
094
095  @Override
096  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
097    List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
098    InputSplit[] results = new InputSplit[splits.size()];
099    for (int i = 0; i < splits.size(); i++) {
100      results[i] = new TableSnapshotRegionSplit(splits.get(i));
101    }
102    return results;
103  }
104
105  @Override
106  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
107      Reporter reporter) throws IOException {
108    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
109  }
110
111  @SuppressWarnings("checkstyle:linelength")
112  /**
113   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
114   * restoreDir.
115   * Sets:
116   * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
117   * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
118   *
119   * @param conf
120   * @param snapshotScans
121   * @param restoreDir
122   * @throws IOException
123   */
124  public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
125      Path restoreDir) throws IOException {
126    new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
127  }
128
129}