View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.lang.reflect.Method;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.UUID;
30  
31  import com.google.protobuf.HBaseZeroCopyByteString;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
42  import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
46  import org.apache.hadoop.hbase.client.IsolationLevel;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.Scan;
49  import org.apache.hadoop.hbase.client.TableSnapshotScanner;
50  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
51  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
52  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
53  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
54  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
55  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
56  import org.apache.hadoop.hbase.regionserver.HRegion;
57  import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
58  import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
59  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
60  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
61  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.FSTableDescriptors;
64  import org.apache.hadoop.io.Writable;
65  import org.apache.hadoop.mapreduce.InputFormat;
66  import org.apache.hadoop.mapreduce.InputSplit;
67  import org.apache.hadoop.mapreduce.Job;
68  import org.apache.hadoop.mapreduce.JobContext;
69  import org.apache.hadoop.mapreduce.RecordReader;
70  import org.apache.hadoop.mapreduce.TaskAttemptContext;
71  
72  import com.google.common.annotations.VisibleForTesting;
73  
74  /**
75   * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
76   * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
77   * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
78   * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
79   * online or offline hbase cluster. The snapshot files can be exported by using the
80   * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to
81   * run the mapreduce job directly over the snapshot files. The snapshot should not be deleted
82   * while there are jobs reading from snapshot files.
83   * <p>
84   * Usage is similar to TableInputFormat, and
85   * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, 
86   *   boolean, Path)}
87   * can be used to configure the job.
88   * <pre>{@code
89   * Job job = new Job(conf);
90   * Scan scan = new Scan();
91   * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
92   *      scan, MyTableMapper.class, MyMapKeyOutput.class,
93   *      MyMapOutputValueWritable.class, job, true);
94   * }
95   * </pre>
96   * <p>
97   * Internally, this input format restores the snapshot into the given tmp directory. Similar to
98   * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
99   * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained
100  * from the user.
101  * <p>
102  * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
103  * snapshot files and data files. HBase also enforces security because all the requests are handled
104  * by the server layer, and the user cannot read from the data files directly. 
105  * To read from snapshot files directly from the file system, the user who is running the MR job 
106  * must have sufficient permissions to access snapshot and reference files. 
107  * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase 
108  * user or the user must have group or other priviledges in the filesystem (See HBASE-8369). 
109  * Note that, given other users access to read from snapshot/data files will completely circumvent 
110  * the access control enforced by HBase.
111  * @see TableSnapshotScanner
112  */
113 @InterfaceAudience.Public
114 @InterfaceStability.Evolving
115 public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
116   // TODO: Snapshots files are owned in fs by the hbase user. There is no
117   // easy way to delegate access.
118 
119   private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
120 
121   /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
122   private static final String LOCALITY_CUTOFF_MULTIPLIER = 
123       "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
124   private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
125 
126   private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
127   private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
128 
129   public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
130     private String regionName;
131     private String[] locations;
132 
133     // constructor for mapreduce framework / Writable
134     public TableSnapshotRegionSplit() { }
135 
136     TableSnapshotRegionSplit(String regionName, List<String> locations) {
137       this.regionName = regionName;
138       if (locations == null || locations.isEmpty()) {
139         this.locations = new String[0];
140       } else {
141         this.locations = locations.toArray(new String[locations.size()]);
142       }
143     }
144     @Override
145     public long getLength() throws IOException, InterruptedException {
146       //TODO: We can obtain the file sizes of the snapshot here.
147       return 0;
148     }
149 
150     @Override
151     public String[] getLocations() throws IOException, InterruptedException {
152       return locations;
153     }
154 
155     // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
156     // doing this wrapping with Writables.
157     @Override
158     public void write(DataOutput out) throws IOException {
159     MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
160       MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
161         .setRegion(RegionSpecifier.newBuilder()
162           .setType(RegionSpecifierType.ENCODED_REGION_NAME)
163           .setValue(HBaseZeroCopyByteString.wrap(Bytes.toBytes(regionName))).build());
164 
165       for (String location : locations) {
166         builder.addLocations(location);
167       }
168 
169       MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
170 
171       ByteArrayOutputStream baos = new ByteArrayOutputStream();
172       split.writeTo(baos);
173       baos.close();
174       byte[] buf = baos.toByteArray();
175       out.writeInt(buf.length);
176       out.write(buf);
177     }
178     @Override
179     public void readFields(DataInput in) throws IOException {
180       int len = in.readInt();
181       byte[] buf = new byte[len];
182       in.readFully(buf);
183       MapReduceProtos.TableSnapshotRegionSplit split = 
184           MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
185       this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
186       List<String> locationsList = split.getLocationsList();
187       this.locations = locationsList.toArray(new String[locationsList.size()]);
188     }
189   }
190 
191   @VisibleForTesting
192   static class TableSnapshotRegionRecordReader extends 
193     RecordReader<ImmutableBytesWritable, Result> {
194     private TableSnapshotRegionSplit split;
195     private Scan scan;
196     private Result result = null;
197     private ImmutableBytesWritable row = null;
198     private ClientSideRegionScanner scanner;
199     private TaskAttemptContext context;
200     private Method getCounter;
201 
202     @Override
203     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
204         InterruptedException {
205 
206       Configuration conf = context.getConfiguration();
207       this.split = (TableSnapshotRegionSplit) split;
208       String regionName = this.split.regionName;
209       String snapshotName = getSnapshotName(conf);
210       Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
211       FileSystem fs = rootDir.getFileSystem(conf);
212 
213       Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
214       // directory where snapshot was restored
215 
216       Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
217 
218       //load table descriptor
219       HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
220 
221       //load region descriptor
222       Path regionDir = new Path(snapshotDir, regionName);
223       HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
224 
225       // create scan
226       String scanStr = conf.get(TableInputFormat.SCAN);
227       if (scanStr == null) {
228         throw new IllegalArgumentException("A Scan is not configured for this job");
229       }
230       scan = TableMapReduceUtil.convertStringToScan(scanStr);
231       // region is immutable, this should be fine,
232       // otherwise we have to set the thread read point
233       scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
234       // disable caching of data blocks
235       scan.setCacheBlocks(false);
236 
237       scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
238       if (context != null) {
239         this.context = context;
240         getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
241       }
242     }
243 
244     @Override
245     public boolean nextKeyValue() throws IOException, InterruptedException {
246       result = scanner.next();
247       if (result == null) {
248         //we are done
249         return false;
250       }
251 
252       if (this.row == null) {
253         this.row = new ImmutableBytesWritable();
254       }
255       this.row.set(result.getRow());
256 
257       ScanMetrics scanMetrics = scanner.getScanMetrics();
258       if (scanMetrics != null && context != null) {
259         TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
260       }
261 
262       return true;
263     }
264 
265     @Override
266     public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
267       return row;
268     }
269 
270     @Override
271     public Result getCurrentValue() throws IOException, InterruptedException {
272       return result;
273     }
274 
275     @Override
276     public float getProgress() throws IOException, InterruptedException {
277       return 0; // TODO: use total bytes to estimate
278     }
279 
280     @Override
281     public void close() throws IOException {
282       if (this.scanner != null) {
283         this.scanner.close();
284       }
285     }
286   }
287 
288   @Override
289   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
290       InputSplit split, TaskAttemptContext context) throws IOException {
291     return new TableSnapshotRegionRecordReader();
292   }
293 
294   @Override
295   public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
296     Configuration conf = job.getConfiguration();
297     String snapshotName = getSnapshotName(conf);
298 
299     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
300     FileSystem fs = rootDir.getFileSystem(conf);
301 
302     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
303     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
304 
305     Set<String> snapshotRegionNames
306       = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
307     if (snapshotRegionNames == null) {
308       throw new IllegalArgumentException("Snapshot seems empty");
309     }
310 
311     // load table descriptor
312     HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
313         snapshotDir);
314 
315     Scan scan = TableMapReduceUtil.convertStringToScan(conf
316       .get(TableInputFormat.SCAN));
317     Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
318 
319     List<InputSplit> splits = new ArrayList<InputSplit>();
320     for (String regionName : snapshotRegionNames) {
321       // load region descriptor
322       Path regionDir = new Path(snapshotDir, regionName);
323       HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
324           regionDir);
325 
326       if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
327           hri.getStartKey(), hri.getEndKey())) {
328         // compute HDFS locations from snapshot files (which will get the locations for
329         // referred hfiles)
330         List<String> hosts = getBestLocations(conf,
331           HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
332 
333         int len = Math.min(3, hosts.size());
334         hosts = hosts.subList(0, len);
335         splits.add(new TableSnapshotRegionSplit(regionName, hosts));
336       }
337     }
338 
339     return splits;
340   }
341 
342   /**
343    * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
344    * weights into account, thus will treat every location passed from the input split as equal. We
345    * do not want to blindly pass all the locations, since we are creating one split per region, and
346    * the region's blocks are all distributed throughout the cluster unless favorite node assignment
347    * is used. On the expected stable case, only one location will contain most of the blocks as 
348    * local.
349    * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
350    * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
351    * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
352    * host with the best locality.
353    */
354   @VisibleForTesting
355   List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
356     List<String> locations = new ArrayList<String>(3);
357 
358     HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
359 
360     if (hostAndWeights.length == 0) {
361       return locations;
362     }
363 
364     HostAndWeight topHost = hostAndWeights[0];
365     locations.add(topHost.getHost());
366 
367     // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
368     double cutoffMultiplier
369       = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
370 
371     double filterWeight = topHost.getWeight() * cutoffMultiplier;
372 
373     for (int i = 1; i < hostAndWeights.length; i++) {
374       if (hostAndWeights[i].getWeight() >= filterWeight) {
375         locations.add(hostAndWeights[i].getHost());
376       } else {
377         break;
378       }
379     }
380 
381     return locations;
382   }
383 
384   /**
385    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
386    * @param job the job to configure
387    * @param snapshotName the name of the snapshot to read from
388    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
389    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
390    * After the job is finished, restoreDir can be deleted.
391    * @throws IOException if an error occurs
392    */
393   public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
394     Configuration conf = job.getConfiguration();
395     conf.set(SNAPSHOT_NAME_KEY, snapshotName);
396 
397     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
398     FileSystem fs = rootDir.getFileSystem(conf);
399 
400     restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
401 
402     // TODO: restore from record readers to parallelize.
403     RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
404 
405     conf.set(TABLE_DIR_KEY, restoreDir.toString());
406   }
407 
408   private static String getSnapshotName(Configuration conf) {
409     String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
410     if (snapshotName == null) {
411       throw new IllegalArgumentException("Snapshot name must be provided");
412     }
413     return snapshotName;
414   }
415 }