001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapred; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.Path; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.hadoop.hbase.client.Result; 025import org.apache.hadoop.hbase.client.Scan; 026import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 027import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; 028import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; 029import org.apache.hadoop.mapred.InputFormat; 030import org.apache.hadoop.mapred.InputSplit; 031import org.apache.hadoop.mapred.JobConf; 032import org.apache.hadoop.mapred.RecordReader; 033import org.apache.hadoop.mapred.Reporter; 034 035import java.io.IOException; 036import java.util.Collection; 037import java.util.List; 038import java.util.Map; 039 040/** 041 * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred 042 * .TableSnapshotInputFormat} 043 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans 044 * configured for each. 045 * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce 046 * .TableSnapshotInputFormat} 047 * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce 048 * .TableSnapshotInputFormat} for 049 * more details. 050 * Usage is similar to TableSnapshotInputFormat, with the following exception: 051 * initMultiTableSnapshotMapperJob takes in a map 052 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding 053 * scan will be applied; 054 * the overall dataset for the job is defined by the concatenation of the regions and tables 055 * included in each snapshot/scan 056 * pair. 057 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, 058 * Class, Class, Class, JobConf, boolean, Path)} 059 * can be used to configure the job. 060 * <pre>{@code 061 * Job job = new Job(conf); 062 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of( 063 * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))), 064 * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))) 065 * ); 066 * Path restoreDir = new Path("/tmp/snapshot_restore_dir") 067 * TableMapReduceUtil.initTableSnapshotMapperJob( 068 * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class, 069 * MyMapOutputValueWritable.class, job, true, restoreDir); 070 * } 071 * </pre> 072 * Internally, this input format restores each snapshot into a subdirectory of the given tmp 073 * directory. Input splits and 074 * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce 075 * .TableSnapshotInputFormat} 076 * (one per region). 077 * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on 078 * permissioning; the 079 * same caveats apply here. 080 * 081 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat 082 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 083 */ 084@InterfaceAudience.Public 085public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat 086 implements InputFormat<ImmutableBytesWritable, Result> { 087 088 private final MultiTableSnapshotInputFormatImpl delegate; 089 090 public MultiTableSnapshotInputFormat() { 091 this.delegate = new MultiTableSnapshotInputFormatImpl(); 092 } 093 094 @Override 095 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 096 List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job); 097 InputSplit[] results = new InputSplit[splits.size()]; 098 for (int i = 0; i < splits.size(); i++) { 099 results[i] = new TableSnapshotRegionSplit(splits.get(i)); 100 } 101 return results; 102 } 103 104 @Override 105 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job, 106 Reporter reporter) throws IOException { 107 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); 108 } 109 110 /** 111 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of 112 * restoreDir. 113 * Sets: {@link org.apache.hadoop.hbase.mapreduce 114 * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, 115 * {@link org.apache.hadoop.hbase.mapreduce 116 * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} 117 * 118 * @param conf 119 * @param snapshotScans 120 * @param restoreDir 121 * @throws IOException 122 */ 123 public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, 124 Path restoreDir) throws IOException { 125 new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); 126 } 127 128}