001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapred; 020 021import edu.umd.cs.findbugs.annotations.SuppressWarnings; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.Path; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.Scan; 027import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 028import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; 029import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; 030import org.apache.hadoop.mapred.InputFormat; 031import org.apache.hadoop.mapred.InputSplit; 032import org.apache.hadoop.mapred.JobConf; 033import org.apache.hadoop.mapred.RecordReader; 034import org.apache.hadoop.mapred.Reporter; 035 036import java.io.IOException; 037import java.util.Collection; 038import java.util.List; 039import java.util.Map; 040 041/** 042 * MultiTableSnapshotInputFormat generalizes 043 * {@link org.apache.hadoop.hbase.mapred.TableSnapshotInputFormat} 044 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans 045 * configured for each. 046 * Internally, the input format delegates to 047 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} 048 * and thus has the same performance advantages; see 049 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} 050 * for more details. 051 * Usage is similar to TableSnapshotInputFormat, with the following exception: 052 * initMultiTableSnapshotMapperJob takes in a map 053 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding 054 * scan will be applied; 055 * the overall dataset for the job is defined by the concatenation of the regions and tables 056 * included in each snapshot/scan 057 * pair. 058 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, 059 * Class, Class, Class, JobConf, boolean, Path)} 060 * can be used to configure the job. 061 * <pre>{@code 062 * Job job = new Job(conf); 063 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of( 064 * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))), 065 * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))) 066 * ); 067 * Path restoreDir = new Path("/tmp/snapshot_restore_dir") 068 * TableMapReduceUtil.initTableSnapshotMapperJob( 069 * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class, 070 * MyMapOutputValueWritable.class, job, true, restoreDir); 071 * } 072 * </pre> 073 * Internally, this input format restores each snapshot into a subdirectory of the given tmp 074 * directory. Input splits and 075 * record readers are created as described in 076 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} 077 * (one per region). 078 * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on 079 * permissioning; the 080 * same caveats apply here. 081 * 082 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat 083 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 084 */ 085@InterfaceAudience.Public 086public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat 087 implements InputFormat<ImmutableBytesWritable, Result> { 088 089 private final MultiTableSnapshotInputFormatImpl delegate; 090 091 public MultiTableSnapshotInputFormat() { 092 this.delegate = new MultiTableSnapshotInputFormatImpl(); 093 } 094 095 @Override 096 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 097 List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job); 098 InputSplit[] results = new InputSplit[splits.size()]; 099 for (int i = 0; i < splits.size(); i++) { 100 results[i] = new TableSnapshotRegionSplit(splits.get(i)); 101 } 102 return results; 103 } 104 105 @Override 106 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job, 107 Reporter reporter) throws IOException { 108 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); 109 } 110 111 @SuppressWarnings("checkstyle:linelength") 112 /** 113 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of 114 * restoreDir. 115 * Sets: 116 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, 117 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} 118 * 119 * @param conf 120 * @param snapshotScans 121 * @param restoreDir 122 * @throws IOException 123 */ 124 public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, 125 Path restoreDir) throws IOException { 126 new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); 127 } 128 129}