001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.Path; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.hadoop.hbase.client.Scan; 026import org.apache.hadoop.mapreduce.InputSplit; 027import org.apache.hadoop.mapreduce.JobContext; 028 029import java.io.IOException; 030import java.util.Collection; 031import java.util.List; 032import java.util.Map; 033 034/** 035 * MultiTableSnapshotInputFormat generalizes 036 * {@link TableSnapshotInputFormat} 037 * allowing a MapReduce job to run over one or more table snapshots, with one or more scans 038 * configured for each. 039 * Internally, the input format delegates to 040 * {@link TableSnapshotInputFormat} 041 * and thus has the same performance advantages; 042 * see {@link TableSnapshotInputFormat} for 043 * more details. 044 * Usage is similar to TableSnapshotInputFormat, with the following exception: 045 * initMultiTableSnapshotMapperJob takes in a map 046 * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding 047 * scan will be applied; 048 * the overall dataset for the job is defined by the concatenation of the regions and tables 049 * included in each snapshot/scan 050 * pair. 051 * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob 052 * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache 053 * .hadoop.fs.Path)} 054 * can be used to configure the job. 055 * <pre>{@code 056 * Job job = new Job(conf); 057 * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of( 058 * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))), 059 * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))) 060 * ); 061 * Path restoreDir = new Path("/tmp/snapshot_restore_dir") 062 * TableMapReduceUtil.initTableSnapshotMapperJob( 063 * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class, 064 * MyMapOutputValueWritable.class, job, true, restoreDir); 065 * } 066 * </pre> 067 * Internally, this input format restores each snapshot into a subdirectory of the given tmp 068 * directory. Input splits and 069 * record readers are created as described in 070 * {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} 071 * (one per region). 072 * See {@link TableSnapshotInputFormat} for more notes on 073 * permissioning; the same caveats apply here. 074 * 075 * @see TableSnapshotInputFormat 076 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 077 */ 078@InterfaceAudience.Public 079public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat { 080 081 private final MultiTableSnapshotInputFormatImpl delegate; 082 083 public MultiTableSnapshotInputFormat() { 084 this.delegate = new MultiTableSnapshotInputFormatImpl(); 085 } 086 087 @Override 088 public List<InputSplit> getSplits(JobContext jobContext) 089 throws IOException, InterruptedException { 090 List<TableSnapshotInputFormatImpl.InputSplit> splits = 091 delegate.getSplits(jobContext.getConfiguration()); 092 List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size()); 093 094 for (TableSnapshotInputFormatImpl.InputSplit split : splits) { 095 rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split)); 096 } 097 098 return rtn; 099 } 100 101 public static void setInput(Configuration configuration, 102 Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException { 103 new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir); 104 } 105}