001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.lang.reflect.Method; 025import java.util.ArrayList; 026import java.util.List; 027 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HRegionInfo; 030import org.apache.hadoop.hbase.HTableDescriptor; 031import org.apache.hadoop.hbase.client.RegionInfo; 032import org.apache.hadoop.hbase.client.Result; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 035import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 036import org.apache.hadoop.hbase.util.RegionSplitter; 037import org.apache.hadoop.io.Writable; 038import org.apache.hadoop.mapreduce.InputFormat; 039import org.apache.hadoop.mapreduce.InputSplit; 040import org.apache.hadoop.mapreduce.Job; 041import org.apache.hadoop.mapreduce.JobContext; 042import org.apache.hadoop.mapreduce.RecordReader; 043import org.apache.hadoop.mapreduce.TaskAttemptContext; 044import org.apache.yetus.audience.InterfaceAudience; 045 046import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 047 048/** 049 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job 050 * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits, 051 * wals, etc) directly to provide maximum performance. The snapshot is not required to be 052 * restored to the live cluster or cloned. This also allows to run the mapreduce job from an 053 * online or offline hbase cluster. The snapshot files can be exported by using the 054 * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, 055 * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. 056 * The snapshot should not be deleted while there are jobs reading from snapshot files. 057 * <p> 058 * Usage is similar to TableInputFormat, and 059 * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)} 060 * can be used to configure the job. 061 * <pre>{@code 062 * Job job = new Job(conf); 063 * Scan scan = new Scan(); 064 * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 065 * scan, MyTableMapper.class, MyMapKeyOutput.class, 066 * MyMapOutputValueWritable.class, job, true); 067 * } 068 * </pre> 069 * <p> 070 * Internally, this input format restores the snapshot into the given tmp directory. By default, 071 * and similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you 072 * can run N mapper tasks per every region, in which case the region key range will be split to 073 * N sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading 074 * from each RecordReader. An internal RegionScanner is used to execute the 075 * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. 076 * <p> 077 * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from 078 * snapshot files and data files. 079 * To read from snapshot files directly from the file system, the user who is running the MR job 080 * must have sufficient permissions to access snapshot and reference files. 081 * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase 082 * user or the user must have group or other privileges in the filesystem (See HBASE-8369). 083 * Note that, given other users access to read from snapshot/data files will completely circumvent 084 * the access control enforced by HBase. 085 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 086 */ 087@InterfaceAudience.Public 088public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> { 089 090 public static class TableSnapshotRegionSplit extends InputSplit implements Writable { 091 private TableSnapshotInputFormatImpl.InputSplit delegate; 092 093 // constructor for mapreduce framework / Writable 094 public TableSnapshotRegionSplit() { 095 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); 096 } 097 098 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { 099 this.delegate = delegate; 100 } 101 102 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, 103 List<String> locations, Scan scan, Path restoreDir) { 104 this.delegate = 105 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); 106 } 107 108 @Override 109 public long getLength() throws IOException, InterruptedException { 110 return delegate.getLength(); 111 } 112 113 @Override 114 public String[] getLocations() throws IOException, InterruptedException { 115 return delegate.getLocations(); 116 } 117 118 @Override 119 public void write(DataOutput out) throws IOException { 120 delegate.write(out); 121 } 122 123 @Override 124 public void readFields(DataInput in) throws IOException { 125 delegate.readFields(in); 126 } 127 128 /** 129 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 130 * Use {@link #getRegion()} 131 */ 132 @Deprecated 133 public HRegionInfo getRegionInfo() { 134 return delegate.getRegionInfo(); 135 } 136 137 public RegionInfo getRegion() { 138 return delegate.getRegionInfo(); 139 } 140 141 TableSnapshotInputFormatImpl.InputSplit getDelegate() { 142 return this.delegate; 143 } 144 } 145 146 @VisibleForTesting 147 static class TableSnapshotRegionRecordReader extends 148 RecordReader<ImmutableBytesWritable, Result> { 149 private TableSnapshotInputFormatImpl.RecordReader delegate = 150 new TableSnapshotInputFormatImpl.RecordReader(); 151 private TaskAttemptContext context; 152 private Method getCounter; 153 154 @Override 155 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, 156 InterruptedException { 157 this.context = context; 158 getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context); 159 delegate.initialize( 160 ((TableSnapshotRegionSplit) split).delegate, 161 context.getConfiguration()); 162 } 163 164 @Override 165 public boolean nextKeyValue() throws IOException, InterruptedException { 166 boolean result = delegate.nextKeyValue(); 167 if (result) { 168 ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); 169 if (scanMetrics != null && context != null) { 170 TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0); 171 } 172 } 173 return result; 174 } 175 176 @Override 177 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 178 return delegate.getCurrentKey(); 179 } 180 181 @Override 182 public Result getCurrentValue() throws IOException, InterruptedException { 183 return delegate.getCurrentValue(); 184 } 185 186 @Override 187 public float getProgress() throws IOException, InterruptedException { 188 return delegate.getProgress(); 189 } 190 191 @Override 192 public void close() throws IOException { 193 delegate.close(); 194 } 195 } 196 197 @Override 198 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( 199 InputSplit split, TaskAttemptContext context) throws IOException { 200 return new TableSnapshotRegionRecordReader(); 201 } 202 203 @Override 204 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 205 List<InputSplit> results = new ArrayList<>(); 206 for (TableSnapshotInputFormatImpl.InputSplit split : 207 TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) { 208 results.add(new TableSnapshotRegionSplit(split)); 209 } 210 return results; 211 } 212 213 /** 214 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 215 * @param job the job to configure 216 * @param snapshotName the name of the snapshot to read from 217 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 218 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 219 * After the job is finished, restoreDir can be deleted. 220 * @throws IOException if an error occurs 221 */ 222 public static void setInput(Job job, String snapshotName, Path restoreDir) 223 throws IOException { 224 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir); 225 } 226 227 /** 228 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 229 * @param job the job to configure 230 * @param snapshotName the name of the snapshot to read from 231 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 232 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 233 * After the job is finished, restoreDir can be deleted. 234 * @param splitAlgo split algorithm to generate splits from region 235 * @param numSplitsPerRegion how many input splits to generate per one region 236 * @throws IOException if an error occurs 237 */ 238 public static void setInput(Job job, String snapshotName, Path restoreDir, 239 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 240 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir, 241 splitAlgo, numSplitsPerRegion); 242 } 243}