001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HRegionInfo; 029import org.apache.hadoop.hbase.HTableDescriptor; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 034import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 035import org.apache.hadoop.hbase.util.RegionSplitter; 036import org.apache.hadoop.io.Writable; 037import org.apache.hadoop.mapreduce.InputFormat; 038import org.apache.hadoop.mapreduce.InputSplit; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.JobContext; 041import org.apache.hadoop.mapreduce.RecordReader; 042import org.apache.hadoop.mapreduce.TaskAttemptContext; 043import org.apache.yetus.audience.InterfaceAudience; 044 045/** 046 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job 047 * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits, 048 * wals, etc) directly to provide maximum performance. The snapshot is not required to be 049 * restored to the live cluster or cloned. This also allows to run the mapreduce job from an 050 * online or offline hbase cluster. The snapshot files can be exported by using the 051 * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, 052 * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. 053 * The snapshot should not be deleted while there are jobs reading from snapshot files. 054 * <p> 055 * Usage is similar to TableInputFormat, and 056 * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)} 057 * can be used to configure the job. 058 * <pre>{@code 059 * Job job = new Job(conf); 060 * Scan scan = new Scan(); 061 * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 062 * scan, MyTableMapper.class, MyMapKeyOutput.class, 063 * MyMapOutputValueWritable.class, job, true); 064 * } 065 * </pre> 066 * <p> 067 * Internally, this input format restores the snapshot into the given tmp directory. By default, 068 * and similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you 069 * can run N mapper tasks per every region, in which case the region key range will be split to 070 * N sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading 071 * from each RecordReader. An internal RegionScanner is used to execute the 072 * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. 073 * <p> 074 * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from 075 * snapshot files and data files. 076 * To read from snapshot files directly from the file system, the user who is running the MR job 077 * must have sufficient permissions to access snapshot and reference files. 078 * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase 079 * user or the user must have group or other privileges in the filesystem (See HBASE-8369). 080 * Note that, given other users access to read from snapshot/data files will completely circumvent 081 * the access control enforced by HBase. 082 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 083 */ 084@InterfaceAudience.Public 085public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> { 086 087 public static class TableSnapshotRegionSplit extends InputSplit implements Writable { 088 private TableSnapshotInputFormatImpl.InputSplit delegate; 089 090 // constructor for mapreduce framework / Writable 091 public TableSnapshotRegionSplit() { 092 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); 093 } 094 095 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { 096 this.delegate = delegate; 097 } 098 099 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, 100 List<String> locations, Scan scan, Path restoreDir) { 101 this.delegate = 102 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); 103 } 104 105 @Override 106 public long getLength() throws IOException, InterruptedException { 107 return delegate.getLength(); 108 } 109 110 @Override 111 public String[] getLocations() throws IOException, InterruptedException { 112 return delegate.getLocations(); 113 } 114 115 @Override 116 public void write(DataOutput out) throws IOException { 117 delegate.write(out); 118 } 119 120 @Override 121 public void readFields(DataInput in) throws IOException { 122 delegate.readFields(in); 123 } 124 125 /** 126 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 127 * Use {@link #getRegion()} 128 */ 129 @Deprecated 130 public HRegionInfo getRegionInfo() { 131 return delegate.getRegionInfo(); 132 } 133 134 public RegionInfo getRegion() { 135 return delegate.getRegionInfo(); 136 } 137 138 TableSnapshotInputFormatImpl.InputSplit getDelegate() { 139 return this.delegate; 140 } 141 } 142 143 @InterfaceAudience.Private 144 static class TableSnapshotRegionRecordReader extends 145 RecordReader<ImmutableBytesWritable, Result> { 146 private TableSnapshotInputFormatImpl.RecordReader delegate = 147 new TableSnapshotInputFormatImpl.RecordReader(); 148 private TaskAttemptContext context; 149 150 @Override 151 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, 152 InterruptedException { 153 this.context = context; 154 delegate.initialize( 155 ((TableSnapshotRegionSplit) split).delegate, 156 context.getConfiguration()); 157 } 158 159 @Override 160 public boolean nextKeyValue() throws IOException, InterruptedException { 161 boolean result = delegate.nextKeyValue(); 162 if (result) { 163 ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); 164 if (scanMetrics != null && context != null) { 165 TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0); 166 } 167 } 168 return result; 169 } 170 171 @Override 172 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 173 return delegate.getCurrentKey(); 174 } 175 176 @Override 177 public Result getCurrentValue() throws IOException, InterruptedException { 178 return delegate.getCurrentValue(); 179 } 180 181 @Override 182 public float getProgress() throws IOException, InterruptedException { 183 return delegate.getProgress(); 184 } 185 186 @Override 187 public void close() throws IOException { 188 delegate.close(); 189 } 190 } 191 192 @Override 193 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( 194 InputSplit split, TaskAttemptContext context) throws IOException { 195 return new TableSnapshotRegionRecordReader(); 196 } 197 198 @Override 199 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 200 List<InputSplit> results = new ArrayList<>(); 201 for (TableSnapshotInputFormatImpl.InputSplit split : 202 TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) { 203 results.add(new TableSnapshotRegionSplit(split)); 204 } 205 return results; 206 } 207 208 /** 209 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 210 * @param job the job to configure 211 * @param snapshotName the name of the snapshot to read from 212 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 213 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 214 * After the job is finished, restoreDir can be deleted. 215 * @throws IOException if an error occurs 216 */ 217 public static void setInput(Job job, String snapshotName, Path restoreDir) 218 throws IOException { 219 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir); 220 } 221 222 /** 223 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 224 * @param job the job to configure 225 * @param snapshotName the name of the snapshot to read from 226 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 227 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 228 * After the job is finished, restoreDir can be deleted. 229 * @param splitAlgo split algorithm to generate splits from region 230 * @param numSplitsPerRegion how many input splits to generate per one region 231 * @throws IOException if an error occurs 232 */ 233 public static void setInput(Job job, String snapshotName, Path restoreDir, 234 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 235 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir, 236 splitAlgo, numSplitsPerRegion); 237 } 238}