001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.client.RegionInfo; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.client.TableDescriptor; 030import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.util.RegionSplitter; 033import org.apache.hadoop.io.Writable; 034import org.apache.hadoop.mapreduce.InputFormat; 035import org.apache.hadoop.mapreduce.InputSplit; 036import org.apache.hadoop.mapreduce.Job; 037import org.apache.hadoop.mapreduce.JobContext; 038import org.apache.hadoop.mapreduce.RecordReader; 039import org.apache.hadoop.mapreduce.TaskAttemptContext; 040import org.apache.yetus.audience.InterfaceAudience; 041 042/** 043 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job bypasses 044 * HBase servers, and directly accesses the underlying files (hfile, recovered edits, wals, etc) 045 * directly to provide maximum performance. The snapshot is not required to be restored to the live 046 * cluster or cloned. This also allows to run the mapreduce job from an online or offline hbase 047 * cluster. The snapshot files can be exported by using the 048 * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, and this 049 * InputFormat can be used to run the mapreduce job directly over the snapshot files. The snapshot 050 * should not be deleted while there are jobs reading from snapshot files. 051 * <p> 052 * Usage is similar to TableInputFormat, and 053 * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)} 054 * can be used to configure the job. 055 * 056 * <pre> 057 * { 058 * @code 059 * Job job = new Job(conf); 060 * Scan scan = new Scan(); 061 * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, MyTableMapper.class, 062 * MyMapKeyOutput.class, MyMapOutputValueWritable.class, job, true); 063 * } 064 * </pre> 065 * <p> 066 * Internally, this input format restores the snapshot into the given tmp directory. By default, and 067 * similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you can 068 * run N mapper tasks per every region, in which case the region key range will be split to N 069 * sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading from 070 * each RecordReader. An internal RegionScanner is used to execute the 071 * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. 072 * <p> 073 * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from 074 * snapshot files and data files. To read from snapshot files directly from the file system, the 075 * user who is running the MR job must have sufficient permissions to access snapshot and reference 076 * files. This means that to run mapreduce over snapshot files, the MR job has to be run as the 077 * HBase user or the user must have group or other privileges in the filesystem (See HBASE-8369). 078 * Note that, given other users access to read from snapshot/data files will completely circumvent 079 * the access control enforced by HBase. 080 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner 081 */ 082@InterfaceAudience.Public 083public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> { 084 085 public static class TableSnapshotRegionSplit extends InputSplit implements Writable { 086 private TableSnapshotInputFormatImpl.InputSplit delegate; 087 088 // constructor for mapreduce framework / Writable 089 public TableSnapshotRegionSplit() { 090 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); 091 } 092 093 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { 094 this.delegate = delegate; 095 } 096 097 public TableSnapshotRegionSplit(TableDescriptor htd, RegionInfo regionInfo, 098 List<String> locations, Scan scan, Path restoreDir) { 099 this.delegate = 100 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); 101 } 102 103 @Override 104 public long getLength() throws IOException, InterruptedException { 105 return delegate.getLength(); 106 } 107 108 @Override 109 public String[] getLocations() throws IOException, InterruptedException { 110 return delegate.getLocations(); 111 } 112 113 @Override 114 public void write(DataOutput out) throws IOException { 115 delegate.write(out); 116 } 117 118 @Override 119 public void readFields(DataInput in) throws IOException { 120 delegate.readFields(in); 121 } 122 123 public RegionInfo getRegion() { 124 return delegate.getRegionInfo(); 125 } 126 127 TableSnapshotInputFormatImpl.InputSplit getDelegate() { 128 return this.delegate; 129 } 130 } 131 132 @InterfaceAudience.Private 133 static class TableSnapshotRegionRecordReader 134 extends RecordReader<ImmutableBytesWritable, Result> { 135 private TableSnapshotInputFormatImpl.RecordReader delegate = 136 new TableSnapshotInputFormatImpl.RecordReader(); 137 private TaskAttemptContext context; 138 139 @Override 140 public void initialize(InputSplit split, TaskAttemptContext context) 141 throws IOException, InterruptedException { 142 this.context = context; 143 delegate.initialize(((TableSnapshotRegionSplit) split).delegate, context.getConfiguration()); 144 } 145 146 @Override 147 public boolean nextKeyValue() throws IOException, InterruptedException { 148 boolean result = delegate.nextKeyValue(); 149 if (result) { 150 ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); 151 if (scanMetrics != null && context != null) { 152 TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0); 153 } 154 } 155 return result; 156 } 157 158 @Override 159 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 160 return delegate.getCurrentKey(); 161 } 162 163 @Override 164 public Result getCurrentValue() throws IOException, InterruptedException { 165 return delegate.getCurrentValue(); 166 } 167 168 @Override 169 public float getProgress() throws IOException, InterruptedException { 170 return delegate.getProgress(); 171 } 172 173 @Override 174 public void close() throws IOException { 175 delegate.close(); 176 } 177 } 178 179 @Override 180 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 181 TaskAttemptContext context) throws IOException { 182 return new TableSnapshotRegionRecordReader(); 183 } 184 185 @Override 186 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 187 List<InputSplit> results = new ArrayList<>(); 188 for (TableSnapshotInputFormatImpl.InputSplit split : TableSnapshotInputFormatImpl 189 .getSplits(job.getConfiguration())) { 190 results.add(new TableSnapshotRegionSplit(split)); 191 } 192 return results; 193 } 194 195 /** 196 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 197 * @param job the job to configure 198 * @param snapshotName the name of the snapshot to read from 199 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 200 * have write permissions to this directory, and this should not be a 201 * subdirectory of rootdir. After the job is finished, restoreDir can be 202 * deleted. 203 * @throws IOException if an error occurs 204 */ 205 public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException { 206 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir); 207 } 208 209 /** 210 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 211 * @param job the job to configure 212 * @param snapshotName the name of the snapshot to read from 213 * @param restoreDir a temporary directory to restore the snapshot into. Current user 214 * should have write permissions to this directory, and this should not 215 * be a subdirectory of rootdir. After the job is finished, restoreDir 216 * can be deleted. 217 * @param splitAlgo split algorithm to generate splits from region 218 * @param numSplitsPerRegion how many input splits to generate per one region 219 * @throws IOException if an error occurs 220 */ 221 public static void setInput(Job job, String snapshotName, Path restoreDir, 222 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 223 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir, 224 splitAlgo, numSplitsPerRegion); 225 } 226 227 /** 228 * clean restore directory after snapshot scan job 229 * @param job the snapshot scan job 230 * @param snapshotName the name of the snapshot to read from 231 * @throws IOException if an error occurs 232 */ 233 public static void cleanRestoreDir(Job job, String snapshotName) throws IOException { 234 TableSnapshotInputFormatImpl.cleanRestoreDir(job, snapshotName); 235 } 236}