001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.List; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.client.RegionInfo; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.client.TableDescriptor; 029import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 030import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; 031import org.apache.hadoop.hbase.util.RegionSplitter; 032import org.apache.hadoop.mapred.InputFormat; 033import org.apache.hadoop.mapred.InputSplit; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.RecordReader; 036import org.apache.hadoop.mapred.Reporter; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further 041 * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}. 042 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat 043 */ 044@InterfaceAudience.Public 045public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> { 046 047 public static class TableSnapshotRegionSplit implements InputSplit { 048 private TableSnapshotInputFormatImpl.InputSplit delegate; 049 050 // constructor for mapreduce framework / Writable 051 public TableSnapshotRegionSplit() { 052 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); 053 } 054 055 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { 056 this.delegate = delegate; 057 } 058 059 public TableSnapshotRegionSplit(TableDescriptor htd, RegionInfo regionInfo, 060 List<String> locations, Scan scan, Path restoreDir) { 061 this.delegate = 062 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); 063 } 064 065 @Override 066 public long getLength() throws IOException { 067 return delegate.getLength(); 068 } 069 070 @Override 071 public String[] getLocations() throws IOException { 072 return delegate.getLocations(); 073 } 074 075 @Override 076 public void write(DataOutput out) throws IOException { 077 delegate.write(out); 078 } 079 080 @Override 081 public void readFields(DataInput in) throws IOException { 082 delegate.readFields(in); 083 } 084 } 085 086 static class TableSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> { 087 088 private TableSnapshotInputFormatImpl.RecordReader delegate; 089 090 public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job) 091 throws IOException { 092 delegate = new TableSnapshotInputFormatImpl.RecordReader(); 093 delegate.initialize(split.delegate, job); 094 } 095 096 @Override 097 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 098 if (!delegate.nextKeyValue()) { 099 return false; 100 } 101 ImmutableBytesWritable currentKey = delegate.getCurrentKey(); 102 key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength()); 103 value.copyFrom(delegate.getCurrentValue()); 104 return true; 105 } 106 107 @Override 108 public ImmutableBytesWritable createKey() { 109 return new ImmutableBytesWritable(); 110 } 111 112 @Override 113 public Result createValue() { 114 return new Result(); 115 } 116 117 @Override 118 public long getPos() throws IOException { 119 return delegate.getPos(); 120 } 121 122 @Override 123 public void close() throws IOException { 124 delegate.close(); 125 } 126 127 @Override 128 public float getProgress() throws IOException { 129 return delegate.getProgress(); 130 } 131 } 132 133 @Override 134 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { 135 List<TableSnapshotInputFormatImpl.InputSplit> splits = 136 TableSnapshotInputFormatImpl.getSplits(job); 137 InputSplit[] results = new InputSplit[splits.size()]; 138 for (int i = 0; i < splits.size(); i++) { 139 results[i] = new TableSnapshotRegionSplit(splits.get(i)); 140 } 141 return results; 142 } 143 144 @Override 145 public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job, 146 Reporter reporter) throws IOException { 147 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); 148 } 149 150 /** 151 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 152 * @param job the job to configure 153 * @param snapshotName the name of the snapshot to read from 154 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 155 * have write permissions to this directory, and this should not be a 156 * subdirectory of rootdir. After the job is finished, restoreDir can be 157 * deleted. 158 * @throws IOException if an error occurs 159 */ 160 public static void setInput(JobConf job, String snapshotName, Path restoreDir) 161 throws IOException { 162 TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir); 163 } 164 165 /** 166 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 167 * @param job the job to configure 168 * @param snapshotName the name of the snapshot to read from 169 * @param restoreDir a temporary directory to restore the snapshot into. Current user 170 * should have write permissions to this directory, and this should not 171 * be a subdirectory of rootdir. After the job is finished, restoreDir 172 * can be deleted. 173 * @param splitAlgo split algorithm to generate splits from region 174 * @param numSplitsPerRegion how many input splits to generate per one region 175 * @throws IOException if an error occurs 176 */ 177 public static void setInput(JobConf job, String snapshotName, Path restoreDir, 178 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 179 TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir, splitAlgo, 180 numSplitsPerRegion); 181 } 182}