001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HRegionInfo; 027import org.apache.hadoop.hbase.HTableDescriptor; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 032import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 033import org.apache.hadoop.hbase.util.RegionSplitter; 034import org.apache.hadoop.io.Writable; 035import org.apache.hadoop.mapreduce.InputFormat; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.Job; 038import org.apache.hadoop.mapreduce.JobContext; 039import org.apache.hadoop.mapreduce.RecordReader; 040import org.apache.hadoop.mapreduce.TaskAttemptContext; 041import org.apache.yetus.audience.InterfaceAudience; 042 043/** 044 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job bypasses 045 * HBase servers, and directly accesses the underlying files (hfile, recovered edits, wals, etc) 046 * directly to provide maximum performance. The snapshot is not required to be restored to the live 047 * cluster or cloned. This also allows to run the mapreduce job from an online or offline hbase 048 * cluster. The snapshot files can be exported by using the 049 * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, and this 050 * InputFormat can be used to run the mapreduce job directly over the snapshot files. The snapshot 051 * should not be deleted while there are jobs reading from snapshot files. 052 * <p> 053 * Usage is similar to TableInputFormat, and 054 * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)} 055 * can be used to configure the job. 056 * 057 * <pre> 058 * { 059 * @code 060 * Job job = new Job(conf); 061 * Scan scan = new Scan(); 062 * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, MyTableMapper.class, 063 * MyMapKeyOutput.class, MyMapOutputValueWritable.class, job, true); 064 * } 065 * </pre> 066 * <p> 067 * Internally, this input format restores the snapshot into the given tmp directory. By default, and 068 * similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you can 069 * run N mapper tasks per every region, in which case the region key range will be split to N 070 * sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading from 071 * each RecordReader. An internal RegionScanner is used to execute the 072 * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. 073 * <p> 074 * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from 075 * snapshot files and data files. To read from snapshot files directly from the file system, the 076 * user who is running the MR job must have sufficient permissions to access snapshot and reference 077 * files. This means that to run mapreduce over snapshot files, the MR job has to be run as the 078 * HBase user or the user must have group or other privileges in the filesystem (See HBASE-8369). 079 * Note that, given other users access to read from snapshot/data files will completely circumvent 080 * the access control enforced by HBase. 081 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 082 */ 083@InterfaceAudience.Public 084public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> { 085 086 public static class TableSnapshotRegionSplit extends InputSplit implements Writable { 087 private TableSnapshotInputFormatImpl.InputSplit delegate; 088 089 // constructor for mapreduce framework / Writable 090 public TableSnapshotRegionSplit() { 091 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); 092 } 093 094 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { 095 this.delegate = delegate; 096 } 097 098 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, 099 List<String> locations, Scan scan, Path restoreDir) { 100 this.delegate = 101 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); 102 } 103 104 @Override 105 public long getLength() throws IOException, InterruptedException { 106 return delegate.getLength(); 107 } 108 109 @Override 110 public String[] getLocations() throws IOException, InterruptedException { 111 return delegate.getLocations(); 112 } 113 114 @Override 115 public void write(DataOutput out) throws IOException { 116 delegate.write(out); 117 } 118 119 @Override 120 public void readFields(DataInput in) throws IOException { 121 delegate.readFields(in); 122 } 123 124 /** 125 * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 Use {@link #getRegion()} 126 */ 127 @Deprecated 128 public HRegionInfo getRegionInfo() { 129 return delegate.getRegionInfo(); 130 } 131 132 public RegionInfo getRegion() { 133 return delegate.getRegionInfo(); 134 } 135 136 TableSnapshotInputFormatImpl.InputSplit getDelegate() { 137 return this.delegate; 138 } 139 } 140 141 @InterfaceAudience.Private 142 static class TableSnapshotRegionRecordReader 143 extends RecordReader<ImmutableBytesWritable, Result> { 144 private TableSnapshotInputFormatImpl.RecordReader delegate = 145 new TableSnapshotInputFormatImpl.RecordReader(); 146 private TaskAttemptContext context; 147 148 @Override 149 public void initialize(InputSplit split, TaskAttemptContext context) 150 throws IOException, InterruptedException { 151 this.context = context; 152 delegate.initialize(((TableSnapshotRegionSplit) split).delegate, context.getConfiguration()); 153 } 154 155 @Override 156 public boolean nextKeyValue() throws IOException, InterruptedException { 157 boolean result = delegate.nextKeyValue(); 158 if (result) { 159 ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); 160 if (scanMetrics != null && context != null) { 161 TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0); 162 } 163 } 164 return result; 165 } 166 167 @Override 168 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 169 return delegate.getCurrentKey(); 170 } 171 172 @Override 173 public Result getCurrentValue() throws IOException, InterruptedException { 174 return delegate.getCurrentValue(); 175 } 176 177 @Override 178 public float getProgress() throws IOException, InterruptedException { 179 return delegate.getProgress(); 180 } 181 182 @Override 183 public void close() throws IOException { 184 delegate.close(); 185 } 186 } 187 188 @Override 189 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 190 TaskAttemptContext context) throws IOException { 191 return new TableSnapshotRegionRecordReader(); 192 } 193 194 @Override 195 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 196 List<InputSplit> results = new ArrayList<>(); 197 for (TableSnapshotInputFormatImpl.InputSplit split : TableSnapshotInputFormatImpl 198 .getSplits(job.getConfiguration())) { 199 results.add(new TableSnapshotRegionSplit(split)); 200 } 201 return results; 202 } 203 204 /** 205 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 206 * @param job the job to configure 207 * @param snapshotName the name of the snapshot to read from 208 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 209 * have write permissions to this directory, and this should not be a 210 * subdirectory of rootdir. After the job is finished, restoreDir can be 211 * deleted. 212 * @throws IOException if an error occurs 213 */ 214 public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException { 215 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir); 216 } 217 218 /** 219 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 220 * @param job the job to configure 221 * @param snapshotName the name of the snapshot to read from 222 * @param restoreDir a temporary directory to restore the snapshot into. Current user 223 * should have write permissions to this directory, and this should not 224 * be a subdirectory of rootdir. After the job is finished, restoreDir 225 * can be deleted. 226 * @param splitAlgo split algorithm to generate splits from region 227 * @param numSplitsPerRegion how many input splits to generate per one region 228 * @throws IOException if an error occurs 229 */ 230 public static void setInput(Job job, String snapshotName, Path restoreDir, 231 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 232 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir, 233 splitAlgo, numSplitsPerRegion); 234 } 235 236 /** 237 * clean restore directory after snapshot scan job 238 * @param job the snapshot scan job 239 * @param snapshotName the name of the snapshot to read from 240 * @throws IOException if an error occurs 241 */ 242 public static void cleanRestoreDir(Job job, String snapshotName) throws IOException { 243 TableSnapshotInputFormatImpl.cleanRestoreDir(job, snapshotName); 244 } 245}