1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.Path;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.classification.InterfaceStability;
25 import org.apache.hadoop.hbase.client.Result;
26 import org.apache.hadoop.hbase.client.Scan;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
29 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
30 import org.apache.hadoop.mapred.InputFormat;
31 import org.apache.hadoop.mapred.InputSplit;
32 import org.apache.hadoop.mapred.JobConf;
33 import org.apache.hadoop.mapred.RecordReader;
34 import org.apache.hadoop.mapred.Reporter;
35
36 import java.io.IOException;
37 import java.util.Collection;
38 import java.util.List;
39 import java.util.Map;
40
41 /**
42 * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
43 * .TableSnapshotInputFormat}
44 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
45 * configured for each.
46 * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
47 * .TableSnapshotInputFormat}
48 * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
49 * .TableSnapshotInputFormat} for
50 * more details.
51 * Usage is similar to TableSnapshotInputFormat, with the following exception:
52 * initMultiTableSnapshotMapperJob takes in a map
53 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
54 * scan will be applied;
55 * the overall dataset for the job is defined by the concatenation of the regions and tables
56 * included in each snapshot/scan
57 * pair.
58 * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
59 * Class, Class, Class, JobConf, boolean, Path)}
60 * can be used to configure the job.
61 * <pre>{@code
62 * Job job = new Job(conf);
63 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
64 * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
65 * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
66 * );
67 * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
68 * TableMapReduceUtil.initTableSnapshotMapperJob(
69 * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
70 * MyMapOutputValueWritable.class, job, true, restoreDir);
71 * }
72 * </pre>
73 * Internally, this input format restores each snapshot into a subdirectory of the given tmp
74 * directory. Input splits and
75 * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
76 * .TableSnapshotInputFormat}
77 * (one per region).
78 * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
79 * permissioning; the
80 * same caveats apply here.
81 *
82 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
83 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
84 */
85 @InterfaceAudience.Public
86 @InterfaceStability.Evolving
87 public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
88 implements InputFormat<ImmutableBytesWritable, Result> {
89
90 private final MultiTableSnapshotInputFormatImpl delegate;
91
92 public MultiTableSnapshotInputFormat() {
93 this.delegate = new MultiTableSnapshotInputFormatImpl();
94 }
95
96 @Override
97 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
98 List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
99 InputSplit[] results = new InputSplit[splits.size()];
100 for (int i = 0; i < splits.size(); i++) {
101 results[i] = new TableSnapshotRegionSplit(splits.get(i));
102 }
103 return results;
104 }
105
106 @Override
107 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
108 Reporter reporter) throws IOException {
109 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
110 }
111
112 /**
113 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
114 * restoreDir.
115 * Sets: {@link org.apache.hadoop.hbase.mapreduce
116 * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
117 * {@link org.apache.hadoop.hbase.mapreduce
118 * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
119 *
120 * @param conf
121 * @param snapshotScans
122 * @param restoreDir
123 * @throws IOException
124 */
125 public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
126 Path restoreDir) throws IOException {
127 new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
128 }
129
130 }