001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import edu.umd.cs.findbugs.annotations.SuppressWarnings; 021import java.io.IOException; 022import java.util.Collection; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 030import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; 031import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; 032import org.apache.hadoop.mapred.InputFormat; 033import org.apache.hadoop.mapred.InputSplit; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.RecordReader; 036import org.apache.hadoop.mapred.Reporter; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * MultiTableSnapshotInputFormat generalizes 041 * {@link org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat} allowing a MapReduce job to run 042 * over one or more table snapshots, with one or more scans configured for each. Internally, the 043 * input format delegates to {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} and 044 * thus has the same performance advantages; see 045 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more details. Usage is 046 * similar to TableSnapshotInputFormat, with the following exception: 047 * initMultiTableSnapshotMapperJob takes in a map from snapshot name to a collection of scans. For 048 * each snapshot in the map, each corresponding scan will be applied; the overall dataset for the 049 * job is defined by the concatenation of the regions and tables included in each snapshot/scan 050 * pair. 051 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, Class, Class, Class, JobConf, boolean, Path)} 052 * can be used to configure the job. 053 * 054 * <pre> 055 * {@code 056 * Job job = new Job(conf); 057 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of( 058 * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))), 059 * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))) 060 * ); 061 * Path restoreDir = new Path("/tmp/snapshot_restore_dir") 062 * TableMapReduceUtil.initTableSnapshotMapperJob( 063 * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class, 064 * MyMapOutputValueWritable.class, job, true, restoreDir); 065 * } 066 * </pre> 067 * 068 * Internally, this input format restores each snapshot into a subdirectory of the given tmp 069 * directory. Input splits and record readers are created as described in 070 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} (one per region). See 071 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on 072 * permissioning; the same caveats apply here. 073 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat 074 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 075 */ 076@InterfaceAudience.Public 077public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat 078 implements InputFormat<ImmutableBytesWritable, Result> { 079 080 private final MultiTableSnapshotInputFormatImpl delegate; 081 082 public MultiTableSnapshotInputFormat() { 083 this.delegate = new MultiTableSnapshotInputFormatImpl(); 084 } 085 086 @Override 087 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 088 List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job); 089 InputSplit[] results = new InputSplit[splits.size()]; 090 for (int i = 0; i < splits.size(); i++) { 091 results[i] = new TableSnapshotRegionSplit(splits.get(i)); 092 } 093 return results; 094 } 095 096 @Override 097 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job, 098 Reporter reporter) throws IOException { 099 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); 100 } 101 102 @SuppressWarnings("checkstyle:linelength") 103 /** 104 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of 105 * restoreDir. Sets: 106 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, 107 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} 108 * nnnn 109 */ 110 public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, 111 Path restoreDir) throws IOException { 112 new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); 113 } 114 115}