View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Set;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
36  import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.client.IsolationLevel;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
44  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
45  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
46  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
47  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
48  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
49  import org.apache.hadoop.hbase.regionserver.HRegion;
50  import org.apache.hadoop.hbase.regionserver.RegionScanner;
51  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
52  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
53  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.FSTableDescriptors;
56  import org.apache.hadoop.io.Text;
57  import org.apache.hadoop.io.Writable;
58  import org.apache.hadoop.mapreduce.Counter;
59  import org.apache.hadoop.mapreduce.InputFormat;
60  import org.apache.hadoop.mapreduce.InputSplit;
61  import org.apache.hadoop.mapreduce.Job;
62  import org.apache.hadoop.mapreduce.JobContext;
63  import org.apache.hadoop.mapreduce.RecordReader;
64  import org.apache.hadoop.mapreduce.TaskAttemptContext;
65  import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
66  import org.apache.hadoop.util.StringUtils;
67  
68  import com.google.common.annotations.VisibleForTesting;
69  
70  /**
71   * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The
72   * job bypasses HBase servers, and directly accesses the underlying files
73   * (hfile, recovered edits, hlogs, etc) directly to provide maximum performance.
74   * The snapshot is not required to be restored or cloned. This also allows to
75   * run the mapreduce job from an online or offline hbase cluster. The snapshot
76   * files can be exported by using the ExportSnapshot tool, to a pure-hdfs
77   * cluster, and this InputFormat can be used to run the mapreduce job directly
78   * over the snapshot files.
79   * <p>
80   * Usage is similar to TableInputFormat.
81   * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job,
82   * boolean, Path)} can be used to configure the job.
83   * 
84   * <pre>
85   * {
86   *   &#064;code
87   *   Job job = new Job(conf);
88   *   Scan scan = new Scan();
89   *   TableMapReduceUtil.initSnapshotMapperJob(snapshotName, scan,
90   *       MyTableMapper.class, MyMapKeyOutput.class,
91   *       MyMapOutputValueWritable.class, job, true, tmpDir);
92   * }
93   * </pre>
94   * <p>
95   * Internally, this input format restores the snapshot into the given tmp
96   * directory. Similar to {@link TableInputFormat} an {@link InputSplit} is created per region.
97   * The region is opened for reading from each RecordReader. An internal
98   * RegionScanner is used to execute the Scan obtained from the user.
99   * <p>
100  * <p>
101  * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
102  * snapshot files and data files. HBase also enforces security because all the requests are handled
103  * by the server layer, and the user cannot read from the data files directly. 
104  * To read from snapshot files directly from the file system, the user who is running the MR job 
105  * must have sufficient permissions to access snapshot and reference files. 
106  * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase 
107  * user or the user must have group or other priviledges in the filesystem (See HBASE-8369). 
108  * Note that, given other users access to read from snapshot/data files will completely circumvent 
109  * the access control enforced by HBase.
110  */
111 public final class TableSnapshotInputFormat extends
112     InputFormat<ImmutableBytesWritable, Result> {
113   // TODO: Snapshots files are owned in fs by the hbase user. There is no
114   // easy way to delegate access.
115 
116   private static final String SNAPSHOT_NAME_KEY = "hbase.mr.snapshot.input.name";
117   private static final String TABLE_DIR_KEY = "hbase.mr.snapshot.input.table.dir";
118 
119   /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
120   private static final String LOCALITY_CUTOFF_MULTIPLIER = 
121       "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
122   private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
123 
124   /**
125    * Snapshot region split.
126    */
127   public static final class TableSnapshotRegionSplit extends InputSplit implements
128       Writable {
129     private String regionName;
130     private String[] locations;
131 
132     /**
133      * Constructor for serialization.
134      */
135     public TableSnapshotRegionSplit() {
136     }
137 
138     /**
139      * Constructor.
140      * 
141      * @param regionName
142      *          Region name
143      * @param locationList
144      *          List of nodes with the region's HDFS blocks, in descending order
145      *          of weight
146      */
147     public TableSnapshotRegionSplit(final String regionName,
148         final List<String> locationList) {
149       this.regionName = regionName;
150 
151       // only use the top node
152       List<String> list = locationList.size() > 1 ? locationList.subList(0, 1)
153           : locationList;
154       this.locations = list.toArray(new String[list.size()]);
155     }
156 
157     @Override
158     public long getLength() throws IOException, InterruptedException {
159       return locations.length;
160     }
161 
162     @Override
163     public String[] getLocations() throws IOException, InterruptedException {
164       return locations;
165     }
166 
167     @Override
168     public void readFields(DataInput in) throws IOException {
169       regionName = Text.readString(in);
170       int locLength = in.readInt();
171       locations = new String[locLength];
172       for (int i = 0; i < locLength; i++) {
173         locations[i] = Text.readString(in);
174       }
175     }
176 
177     @Override
178     public void write(DataOutput out) throws IOException {
179       Text.writeString(out, regionName);
180       out.writeInt(locations.length);
181       for (String l : locations) {
182         Text.writeString(out, l);
183       }
184     }
185   }
186 
187   /**
188    * Snapshot region record reader.
189    */
190   public static final class TableSnapshotRegionRecordReader extends
191       RecordReader<ImmutableBytesWritable, Result> {
192     static final Log LOG = LogFactory.getLog(TableSnapshotRegionRecordReader.class);
193 
194     // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
195     private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
196 
197     private TableSnapshotRegionSplit split;
198     private HRegion region;
199     private Scan scan;
200     private RegionScanner scanner;
201     private List<KeyValue> values;
202     private Result result = null;
203     private ImmutableBytesWritable row = null;
204     private boolean more;
205     private ScanMetrics scanMetrics = null;
206     private TaskAttemptContext context = null;
207     private Method getCounter = null;
208 
209     @Override
210     public void initialize(final InputSplit aSplit,
211         final TaskAttemptContext context) throws IOException,
212         InterruptedException {
213       Configuration conf = context.getConfiguration();
214       this.split = (TableSnapshotRegionSplit) aSplit;
215 
216       Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
217       FileSystem fs = rootDir.getFileSystem(conf);
218 
219       String snapshotName = getSnapshotName(conf);
220       Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
221           snapshotName, rootDir);
222 
223       // load region descriptor
224       String regionName = this.split.regionName;
225       Path regionDir = new Path(snapshotDir, regionName);
226       HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
227 
228       // create scan
229       scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
230       // region is immutable, this should be fine, otherwise we have to set the
231       // thread read point...
232       scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
233       scan.setCacheBlocks(false);
234 
235       // load table descriptor
236       HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs,
237           snapshotDir);
238       Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
239 
240       // open region from the snapshot directory
241       this.region = openRegion(tableDir, fs, conf, hri, htd);
242 
243       // create region scanner
244       this.scanner = region.getScanner(scan);
245       values = new ArrayList<KeyValue>();
246       this.more = true;
247       this.scanMetrics = new ScanMetrics();
248 
249       if (context != null) {
250         this.context = context;
251         getCounter = retrieveGetCounterWithStringsParams(context);
252       }
253       region.startRegionOperation();
254     }
255 
256     private HRegion openRegion(final Path tableDir, final FileSystem fs,
257         final Configuration conf, final HRegionInfo hri,
258         final HTableDescriptor htd) throws IOException {
259       HRegion r = HRegion.newHRegion(tableDir, null, fs, conf, hri, htd, null);
260       r.initialize(null);
261       return r;
262     }
263 
264     @Override
265     public boolean nextKeyValue() throws IOException, InterruptedException {
266       values.clear();
267       // RegionScanner.next() has a different contract than
268       // RecordReader.nextKeyValue(). Scanner
269       // indicates no value read by returning empty results. Returns boolean
270       // indicates if more
271       // rows exist AFTER this one
272       if (!more) {
273         updateCounters();
274         return false;
275       }
276       more = scanner.nextRaw(values, scan.getBatch(), null);
277       if (values.isEmpty()) {
278         // we are done
279         updateCounters();
280         return false;
281       }
282       for (KeyValue kv : values) {
283         this.scanMetrics.countOfBytesInResults.inc(kv.getLength());
284       }
285       this.result = new Result(values);
286       if (this.row == null) {
287         this.row = new ImmutableBytesWritable();
288       }
289       this.row.set(result.getRow());
290 
291       return true;
292     }
293 
294     @Override
295     public ImmutableBytesWritable getCurrentKey() throws IOException,
296         InterruptedException {
297       return row;
298     }
299 
300     @Override
301     public Result getCurrentValue() throws IOException, InterruptedException {
302       return result;
303     }
304 
305     @Override
306     public float getProgress() throws IOException, InterruptedException {
307       return 0;
308     }
309 
310     @Override
311     public void close() throws IOException {
312       try {
313         if (this.scanner != null) {
314           this.scanner.close();
315         }
316       } finally {
317         if (region != null) {
318           region.closeRegionOperation();
319           region.close(true);
320         }
321       }
322     }
323 
324     /**
325      * If hbase runs on new version of mapreduce, RecordReader has access to
326      * counters thus can update counters based on scanMetrics.
327      * If hbase runs on old version of mapreduce, it won't be able to get
328      * access to counters and TableRecorderReader can't update counter values.
329      * @throws IOException
330      */
331     private void updateCounters() throws IOException {
332       // we can get access to counters only if hbase uses new mapreduce APIs
333       if (this.getCounter == null) {
334         return;
335       }
336 
337       MetricsTimeVaryingLong[] mlvs =
338         scanMetrics.getMetricsTimeVaryingLongArray();
339 
340       try {
341         for (MetricsTimeVaryingLong mlv : mlvs) {
342           Counter ct = (Counter)this.getCounter.invoke(context,
343             HBASE_COUNTER_GROUP_NAME, mlv.getName());
344           ct.increment(mlv.getCurrentIntervalValue());
345         }
346       } catch (Exception e) {
347         LOG.debug("can't update counter." + StringUtils.stringifyException(e));
348       }
349     }
350 
351    /**
352      * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
353      * Check if getCounter(String, String) method is available.
354      * @return The getCounter method or null if not available.
355      * @throws IOException
356      */
357     private Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
358     throws IOException {
359       Method m = null;
360       try {
361         m = context.getClass().getMethod("getCounter",
362           new Class [] {String.class, String.class});
363       } catch (SecurityException e) {
364         throw new IOException("Failed test for getCounter", e);
365       } catch (NoSuchMethodException e) {
366         // Ignore
367       }
368       return m;
369     }
370 
371   }
372 
373   @Override
374   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
375       final InputSplit split, final TaskAttemptContext context)
376       throws IOException {
377     return new TableSnapshotRegionRecordReader();
378   }
379 
380   @Override
381   public List<InputSplit> getSplits(final JobContext job) throws IOException,
382       InterruptedException {
383     Configuration conf = job.getConfiguration();
384     String snapshotName = getSnapshotName(job.getConfiguration());
385 
386     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
387     FileSystem fs = rootDir.getFileSystem(conf);
388 
389     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
390         snapshotName, rootDir);
391 
392     Set<String> snapshotRegionNames = SnapshotReferenceUtil
393         .getSnapshotRegionNames(fs, snapshotDir);
394     if (snapshotRegionNames == null) {
395       throw new IllegalArgumentException("Snapshot is empty");
396     }
397 
398     // load table descriptor
399     HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs,
400         snapshotDir);
401 
402     Scan scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
403     Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
404 
405     List<InputSplit> splits = new ArrayList<InputSplit>(
406         snapshotRegionNames.size());
407     for (String regionName : snapshotRegionNames) {
408       // load region descriptor
409       Path regionDir = new Path(snapshotDir, regionName);
410       HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
411 
412       if (keyRangesOverlap(scan.getStartRow(), scan.getStopRow(),
413           hri.getStartKey(), hri.getEndKey())) {
414         // compute HDFS locations from snapshot files (which will get the locations for
415         // referred hfiles)
416         List<String> hosts = getBestLocations(conf,
417           HRegion.computeHDFSBlocksDistribution(conf, htd, hri.getEncodedName(), tableDir));
418 
419         int len = Math.min(3, hosts.size());
420         hosts = hosts.subList(0, len);
421         splits.add(new TableSnapshotRegionSplit(regionName, hosts));
422       }
423     }
424 
425     return splits;
426   }
427 
428   private boolean keyRangesOverlap(final byte[] start1, final byte[] end1,
429       final byte[] start2, final byte[] end2) {
430     return (end2.length == 0 || start1.length == 0 || Bytes.compareTo(start1,
431         end2) < 0)
432         && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2,
433             end1) < 0);
434   }
435 
436   /**
437    * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
438    * weights into account, thus will treat every location passed from the input split as equal. We
439    * do not want to blindly pass all the locations, since we are creating one split per region, and
440    * the region's blocks are all distributed throughout the cluster unless favorite node assignment
441    * is used. On the expected stable case, only one location will contain most of the blocks as 
442    * local.
443    * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
444    * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
445    * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
446    * host with the best locality.
447    */
448   @VisibleForTesting
449   List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
450     List<String> locations = new ArrayList<String>(3);
451 
452     HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
453 
454     if (hostAndWeights.length == 0) {
455       return locations;
456     }
457 
458     HostAndWeight topHost = hostAndWeights[0];
459     locations.add(topHost.getHost());
460 
461     // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
462     double cutoffMultiplier
463       = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
464 
465     double filterWeight = topHost.getWeight() * cutoffMultiplier;
466 
467     for (int i = 1; i < hostAndWeights.length; i++) {
468       if (hostAndWeights[i].getWeight() >= filterWeight) {
469         locations.add(hostAndWeights[i].getHost());
470       } else {
471         break;
472       }
473     }
474 
475     return locations;
476   }
477 
478   /**
479    * Set job input.
480    * 
481    * @param job
482    *          The job
483    * @param snapshotName
484    *          The snapshot name
485    * @param restoreDir
486    *          The directory where the temp table will be created
487    * @throws IOException
488    *           on error
489    */
490   public static void setInput(final Job job, final String snapshotName,
491       final Path restoreDir) throws IOException {
492     Configuration conf = job.getConfiguration();
493     conf.set(SNAPSHOT_NAME_KEY, snapshotName);
494 
495     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
496     FileSystem fs = rootDir.getFileSystem(conf);
497 
498     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
499         snapshotName, rootDir);
500     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils
501         .readSnapshotInfo(fs, snapshotDir);
502 
503     // load table descriptor
504     HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs,
505         snapshotDir);
506 
507     Path tableDir = new Path(restoreDir, htd.getNameAsString());
508     conf.set(TABLE_DIR_KEY, tableDir.toString());
509 
510     MonitoredTask status = TaskMonitor.get().createStatus(
511         "Restoring  snapshot '" + snapshotName + "' to directory " + tableDir);
512     ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
513 
514     RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs,
515         snapshotDesc, snapshotDir, htd, tableDir, monitor, status);
516     helper.restoreHdfsRegions();
517   }
518 
519   private static String getSnapshotName(final Configuration conf) {
520     String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
521     if (snapshotName == null) {
522       throw new IllegalArgumentException("Snapshot name must be provided");
523     }
524     return snapshotName;
525   }
526 }
527