View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapred;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.Path;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.classification.InterfaceStability;
25  import org.apache.hadoop.hbase.client.Result;
26  import org.apache.hadoop.hbase.client.Scan;
27  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28  import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
29  import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
30  import org.apache.hadoop.mapred.InputFormat;
31  import org.apache.hadoop.mapred.InputSplit;
32  import org.apache.hadoop.mapred.JobConf;
33  import org.apache.hadoop.mapred.RecordReader;
34  import org.apache.hadoop.mapred.Reporter;
35  
36  import java.io.IOException;
37  import java.util.Collection;
38  import java.util.List;
39  import java.util.Map;
40  
41  /**
42   * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
43   * .TableSnapshotInputFormat}
44   * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
45   * configured for each.
46   * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
47   * .TableSnapshotInputFormat}
48   * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
49   * .TableSnapshotInputFormat} for
50   * more details.
51   * Usage is similar to TableSnapshotInputFormat, with the following exception:
52   * initMultiTableSnapshotMapperJob takes in a map
53   * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
54   * scan will be applied;
55   * the overall dataset for the job is defined by the concatenation of the regions and tables
56   * included in each snapshot/scan
57   * pair.
58   * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
59   * Class, Class, Class, JobConf, boolean, Path)}
60   * can be used to configure the job.
61   * <pre>{@code
62   * Job job = new Job(conf);
63   * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
64   *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
65   *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
66   * );
67   * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
68   * TableMapReduceUtil.initTableSnapshotMapperJob(
69   *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
70   *      MyMapOutputValueWritable.class, job, true, restoreDir);
71   * }
72   * </pre>
73   * Internally, this input format restores each snapshot into a subdirectory of the given tmp
74   * directory. Input splits and
75   * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
76   * .TableSnapshotInputFormat}
77   * (one per region).
78   * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
79   * permissioning; the
80   * same caveats apply here.
81   *
82   * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
83   * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
84   */
85  @InterfaceAudience.Public
86  @InterfaceStability.Evolving
87  public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
88      implements InputFormat<ImmutableBytesWritable, Result> {
89  
90    private final MultiTableSnapshotInputFormatImpl delegate;
91  
92    public MultiTableSnapshotInputFormat() {
93      this.delegate = new MultiTableSnapshotInputFormatImpl();
94    }
95  
96    @Override
97    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
98      List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
99      InputSplit[] results = new InputSplit[splits.size()];
100     for (int i = 0; i < splits.size(); i++) {
101       results[i] = new TableSnapshotRegionSplit(splits.get(i));
102     }
103     return results;
104   }
105 
106   @Override
107   public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
108       Reporter reporter) throws IOException {
109     return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
110   }
111 
112   /**
113    * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
114    * restoreDir.
115    * Sets: {@link org.apache.hadoop.hbase.mapreduce
116    * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
117    * {@link org.apache.hadoop.hbase.mapreduce
118    * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
119    *
120    * @param conf
121    * @param snapshotScans
122    * @param restoreDir
123    * @throws IOException
124    */
125   public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
126       Path restoreDir) throws IOException {
127     new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
128   }
129 
130 }