001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import edu.umd.cs.findbugs.annotations.SuppressWarnings;
021import java.io.IOException;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
030import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
031import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
032import org.apache.hadoop.mapred.InputFormat;
033import org.apache.hadoop.mapred.InputSplit;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.RecordReader;
036import org.apache.hadoop.mapred.Reporter;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * MultiTableSnapshotInputFormat generalizes
041 * {@link org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat} allowing a MapReduce job to run
042 * over one or more table snapshots, with one or more scans configured for each. Internally, the
043 * input format delegates to {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} and
044 * thus has the same performance advantages; see
045 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more details. Usage is
046 * similar to TableSnapshotInputFormat, with the following exception:
047 * initMultiTableSnapshotMapperJob takes in a map from snapshot name to a collection of scans. For
048 * each snapshot in the map, each corresponding scan will be applied; the overall dataset for the
049 * job is defined by the concatenation of the regions and tables included in each snapshot/scan
050 * pair.
051 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, Class, Class, Class, JobConf, boolean, Path)}
052 * can be used to configure the job.
053 *
054 * <pre>
055 * {@code
056 * Job job = new Job(conf);
057 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
058 *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
059 *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
060 * );
061 * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
062 * TableMapReduceUtil.initTableSnapshotMapperJob(
063 *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
064 *      MyMapOutputValueWritable.class, job, true, restoreDir);
065 * }
066 * </pre>
067 *
068 * Internally, this input format restores each snapshot into a subdirectory of the given tmp
069 * directory. Input splits and record readers are created as described in
070 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} (one per region). See
071 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
072 * permissioning; the same caveats apply here.
073 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
074 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
075 */
076@InterfaceAudience.Public
077public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
078  implements InputFormat<ImmutableBytesWritable, Result> {
079
080  private final MultiTableSnapshotInputFormatImpl delegate;
081
082  public MultiTableSnapshotInputFormat() {
083    this.delegate = new MultiTableSnapshotInputFormatImpl();
084  }
085
086  @Override
087  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
088    List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
089    InputSplit[] results = new InputSplit[splits.size()];
090    for (int i = 0; i < splits.size(); i++) {
091      results[i] = new TableSnapshotRegionSplit(splits.get(i));
092    }
093    return results;
094  }
095
096  @Override
097  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
098    Reporter reporter) throws IOException {
099    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
100  }
101
102  @SuppressWarnings("checkstyle:linelength")
103  /**
104   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
105   * restoreDir. Sets:
106   * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
107   * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
108   */
109  public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
110    Path restoreDir) throws IOException {
111    new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
112  }
113
114}