1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.mapred; 20 21 import org.apache.hadoop.conf.Configuration; 22 import org.apache.hadoop.fs.Path; 23 import org.apache.hadoop.hbase.classification.InterfaceAudience; 24 import org.apache.hadoop.hbase.classification.InterfaceStability; 25 import org.apache.hadoop.hbase.client.Result; 26 import org.apache.hadoop.hbase.client.Scan; 27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 28 import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; 29 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; 30 import org.apache.hadoop.mapred.InputFormat; 31 import org.apache.hadoop.mapred.InputSplit; 32 import org.apache.hadoop.mapred.JobConf; 33 import org.apache.hadoop.mapred.RecordReader; 34 import org.apache.hadoop.mapred.Reporter; 35 36 import java.io.IOException; 37 import java.util.Collection; 38 import java.util.List; 39 import java.util.Map; 40 41 /** 42 * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred 43 * .TableSnapshotInputFormat} 44 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans 45 * configured for each. 46 * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce 47 * .TableSnapshotInputFormat} 48 * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce 49 * .TableSnapshotInputFormat} for 50 * more details. 51 * Usage is similar to TableSnapshotInputFormat, with the following exception: 52 * initMultiTableSnapshotMapperJob takes in a map 53 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding 54 * scan will be applied; 55 * the overall dataset for the job is defined by the concatenation of the regions and tables 56 * included in each snapshot/scan 57 * pair. 58 * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, 59 * Class, Class, Class, JobConf, boolean, Path)} 60 * can be used to configure the job. 61 * <pre>{@code 62 * Job job = new Job(conf); 63 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of( 64 * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))), 65 * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))) 66 * ); 67 * Path restoreDir = new Path("/tmp/snapshot_restore_dir") 68 * TableMapReduceUtil.initTableSnapshotMapperJob( 69 * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class, 70 * MyMapOutputValueWritable.class, job, true, restoreDir); 71 * } 72 * </pre> 73 * Internally, this input format restores each snapshot into a subdirectory of the given tmp 74 * directory. Input splits and 75 * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce 76 * .TableSnapshotInputFormat} 77 * (one per region). 78 * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on 79 * permissioning; the 80 * same caveats apply here. 81 * 82 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat 83 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 84 */ 85 @InterfaceAudience.Public 86 @InterfaceStability.Evolving 87 public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat 88 implements InputFormat<ImmutableBytesWritable, Result> { 89 90 private final MultiTableSnapshotInputFormatImpl delegate; 91 92 public MultiTableSnapshotInputFormat() { 93 this.delegate = new MultiTableSnapshotInputFormatImpl(); 94 } 95 96 @Override 97 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 98 List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job); 99 InputSplit[] results = new InputSplit[splits.size()]; 100 for (int i = 0; i < splits.size(); i++) { 101 results[i] = new TableSnapshotRegionSplit(splits.get(i)); 102 } 103 return results; 104 } 105 106 @Override 107 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job, 108 Reporter reporter) throws IOException { 109 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); 110 } 111 112 /** 113 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of 114 * restoreDir. 115 * Sets: {@link org.apache.hadoop.hbase.mapreduce 116 * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, 117 * {@link org.apache.hadoop.hbase.mapreduce 118 * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} 119 * 120 * @param conf 121 * @param snapshotScans 122 * @param restoreDir 123 * @throws IOException 124 */ 125 public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, 126 Path restoreDir) throws IOException { 127 new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); 128 } 129 130 }