001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapred;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.Path;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.hadoop.hbase.client.Result;
025import org.apache.hadoop.hbase.client.Scan;
026import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
027import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
028import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
029import org.apache.hadoop.mapred.InputFormat;
030import org.apache.hadoop.mapred.InputSplit;
031import org.apache.hadoop.mapred.JobConf;
032import org.apache.hadoop.mapred.RecordReader;
033import org.apache.hadoop.mapred.Reporter;
034
035import java.io.IOException;
036import java.util.Collection;
037import java.util.List;
038import java.util.Map;
039
040/**
041 * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
042 * .TableSnapshotInputFormat}
043 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
044 * configured for each.
045 * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
046 * .TableSnapshotInputFormat}
047 * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
048 * .TableSnapshotInputFormat} for
049 * more details.
050 * Usage is similar to TableSnapshotInputFormat, with the following exception:
051 * initMultiTableSnapshotMapperJob takes in a map
052 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
053 * scan will be applied;
054 * the overall dataset for the job is defined by the concatenation of the regions and tables
055 * included in each snapshot/scan
056 * pair.
057 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
058 * Class, Class, Class, JobConf, boolean, Path)}
059 * can be used to configure the job.
060 * <pre>{@code
061 * Job job = new Job(conf);
062 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
063 *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
064 *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
065 * );
066 * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
067 * TableMapReduceUtil.initTableSnapshotMapperJob(
068 *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
069 *      MyMapOutputValueWritable.class, job, true, restoreDir);
070 * }
071 * </pre>
072 * Internally, this input format restores each snapshot into a subdirectory of the given tmp
073 * directory. Input splits and
074 * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
075 * .TableSnapshotInputFormat}
076 * (one per region).
077 * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
078 * permissioning; the
079 * same caveats apply here.
080 *
081 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
082 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
083 */
084@InterfaceAudience.Public
085public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
086    implements InputFormat<ImmutableBytesWritable, Result> {
087
088  private final MultiTableSnapshotInputFormatImpl delegate;
089
090  public MultiTableSnapshotInputFormat() {
091    this.delegate = new MultiTableSnapshotInputFormatImpl();
092  }
093
094  @Override
095  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
096    List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
097    InputSplit[] results = new InputSplit[splits.size()];
098    for (int i = 0; i < splits.size(); i++) {
099      results[i] = new TableSnapshotRegionSplit(splits.get(i));
100    }
101    return results;
102  }
103
104  @Override
105  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
106      Reporter reporter) throws IOException {
107    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
108  }
109
110  /**
111   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
112   * restoreDir.
113   * Sets: {@link org.apache.hadoop.hbase.mapreduce
114   * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
115   * {@link org.apache.hadoop.hbase.mapreduce
116   * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
117   *
118   * @param conf
119   * @param snapshotScans
120   * @param restoreDir
121   * @throws IOException
122   */
123  public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
124      Path restoreDir) throws IOException {
125    new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
126  }
127
128}