001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapred; 020 021import org.apache.hadoop.fs.Path; 022import org.apache.hadoop.hbase.HRegionInfo; 023import org.apache.hadoop.hbase.HTableDescriptor; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.Scan; 027import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 028import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; 029import org.apache.hadoop.hbase.util.RegionSplitter; 030import org.apache.hadoop.mapred.InputFormat; 031import org.apache.hadoop.mapred.InputSplit; 032import org.apache.hadoop.mapred.JobConf; 033import org.apache.hadoop.mapred.RecordReader; 034import org.apache.hadoop.mapred.Reporter; 035 036import java.io.DataInput; 037import java.io.DataOutput; 038import java.io.IOException; 039import java.util.List; 040 041/** 042 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further 043 * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}. 044 * 045 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat 046 */ 047@InterfaceAudience.Public 048public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> { 049 050 public static class TableSnapshotRegionSplit implements InputSplit { 051 private TableSnapshotInputFormatImpl.InputSplit delegate; 052 053 // constructor for mapreduce framework / Writable 054 public TableSnapshotRegionSplit() { 055 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); 056 } 057 058 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { 059 this.delegate = delegate; 060 } 061 062 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, 063 List<String> locations, Scan scan, Path restoreDir) { 064 this.delegate = 065 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); 066 } 067 068 @Override 069 public long getLength() throws IOException { 070 return delegate.getLength(); 071 } 072 073 @Override 074 public String[] getLocations() throws IOException { 075 return delegate.getLocations(); 076 } 077 078 @Override 079 public void write(DataOutput out) throws IOException { 080 delegate.write(out); 081 } 082 083 @Override 084 public void readFields(DataInput in) throws IOException { 085 delegate.readFields(in); 086 } 087 } 088 089 static class TableSnapshotRecordReader 090 implements RecordReader<ImmutableBytesWritable, Result> { 091 092 private TableSnapshotInputFormatImpl.RecordReader delegate; 093 094 public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job) 095 throws IOException { 096 delegate = new TableSnapshotInputFormatImpl.RecordReader(); 097 delegate.initialize(split.delegate, job); 098 } 099 100 @Override 101 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 102 if (!delegate.nextKeyValue()) { 103 return false; 104 } 105 ImmutableBytesWritable currentKey = delegate.getCurrentKey(); 106 key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength()); 107 value.copyFrom(delegate.getCurrentValue()); 108 return true; 109 } 110 111 @Override 112 public ImmutableBytesWritable createKey() { 113 return new ImmutableBytesWritable(); 114 } 115 116 @Override 117 public Result createValue() { 118 return new Result(); 119 } 120 121 @Override 122 public long getPos() throws IOException { 123 return delegate.getPos(); 124 } 125 126 @Override 127 public void close() throws IOException { 128 delegate.close(); 129 } 130 131 @Override 132 public float getProgress() throws IOException { 133 return delegate.getProgress(); 134 } 135 } 136 137 @Override 138 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 139 List<TableSnapshotInputFormatImpl.InputSplit> splits = 140 TableSnapshotInputFormatImpl.getSplits(job); 141 InputSplit[] results = new InputSplit[splits.size()]; 142 for (int i = 0; i < splits.size(); i++) { 143 results[i] = new TableSnapshotRegionSplit(splits.get(i)); 144 } 145 return results; 146 } 147 148 @Override 149 public RecordReader<ImmutableBytesWritable, Result> 150 getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { 151 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); 152 } 153 154 /** 155 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 156 * @param job the job to configure 157 * @param snapshotName the name of the snapshot to read from 158 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 159 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 160 * After the job is finished, restoreDir can be deleted. 161 * @throws IOException if an error occurs 162 */ 163 public static void setInput(JobConf job, String snapshotName, Path restoreDir) 164 throws IOException { 165 TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir); 166 } 167 168 /** 169 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 170 * @param job the job to configure 171 * @param snapshotName the name of the snapshot to read from 172 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 173 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 174 * After the job is finished, restoreDir can be deleted. 175 * @param splitAlgo split algorithm to generate splits from region 176 * @param numSplitsPerRegion how many input splits to generate per one region 177 * @throws IOException if an error occurs 178 */ 179 public static void setInput(JobConf job, String snapshotName, Path restoreDir, 180 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 181 TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir, splitAlgo, numSplitsPerRegion); 182 } 183}