View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import com.google.common.collect.Lists;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.CellUtil;
28  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
29  import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
35  import org.apache.hadoop.hbase.client.IsolationLevel;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
41  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
42  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
43  import org.apache.hadoop.hbase.regionserver.HRegion;
44  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
45  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
46  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.FSUtils;
49  import org.apache.hadoop.io.Writable;
50  
51  import java.io.ByteArrayOutputStream;
52  import java.io.DataInput;
53  import java.io.DataOutput;
54  import java.io.IOException;
55  import java.util.ArrayList;
56  import java.util.List;
57  import java.util.UUID;
58  
59  /**
60   * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
61   */
62  @InterfaceAudience.Private
63  @InterfaceStability.Evolving
64  public class TableSnapshotInputFormatImpl {
65    // TODO: Snapshots files are owned in fs by the hbase user. There is no
66    // easy way to delegate access.
67  
68    public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
69  
70    private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
71    // key for specifying the root dir of the restored snapshot
72    protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
73  
74    /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
75    private static final String LOCALITY_CUTOFF_MULTIPLIER =
76      "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
77    private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
78  
79    /**
80     * Implementation class for InputSplit logic common between mapred and mapreduce.
81     */
82    public static class InputSplit implements Writable {
83  
84      private HTableDescriptor htd;
85      private HRegionInfo regionInfo;
86      private String[] locations;
87      private String scan;
88      private String restoreDir;
89  
90      // constructor for mapreduce framework / Writable
91      public InputSplit() {}
92  
93      public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
94          Scan scan, Path restoreDir) {
95        this.htd = htd;
96        this.regionInfo = regionInfo;
97        if (locations == null || locations.isEmpty()) {
98          this.locations = new String[0];
99        } else {
100         this.locations = locations.toArray(new String[locations.size()]);
101       }
102       try {
103         this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
104       } catch (IOException e) {
105         LOG.warn("Failed to convert Scan to String", e);
106       }
107 
108       this.restoreDir = restoreDir.toString();
109     }
110 
111     public HTableDescriptor getHtd() {
112       return htd;
113     }
114 
115     public String getScan() {
116       return scan;
117     }
118 
119     public String getRestoreDir() {
120       return restoreDir;
121     }
122 
123     public long getLength() {
124       //TODO: We can obtain the file sizes of the snapshot here.
125       return 0;
126     }
127 
128     public String[] getLocations() {
129       return locations;
130     }
131 
132     public HTableDescriptor getTableDescriptor() {
133       return htd;
134     }
135 
136     public HRegionInfo getRegionInfo() {
137       return regionInfo;
138     }
139 
140     // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
141     // doing this wrapping with Writables.
142     @Override
143     public void write(DataOutput out) throws IOException {
144       TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
145           .setTable(ProtobufUtil.convertToTableSchema(htd))
146           .setRegion(HRegionInfo.convert(regionInfo));
147 
148       for (String location : locations) {
149         builder.addLocations(location);
150       }
151 
152       TableSnapshotRegionSplit split = builder.build();
153 
154       ByteArrayOutputStream baos = new ByteArrayOutputStream();
155       split.writeTo(baos);
156       baos.close();
157       byte[] buf = baos.toByteArray();
158       out.writeInt(buf.length);
159       out.write(buf);
160 
161       Bytes.writeByteArray(out, Bytes.toBytes(scan));
162       Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
163 
164     }
165 
166     @Override
167     public void readFields(DataInput in) throws IOException {
168       int len = in.readInt();
169       byte[] buf = new byte[len];
170       in.readFully(buf);
171       TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
172       this.htd = ProtobufUtil.convertToHTableDesc(split.getTable());
173       this.regionInfo = HRegionInfo.convert(split.getRegion());
174       List<String> locationsList = split.getLocationsList();
175       this.locations = locationsList.toArray(new String[locationsList.size()]);
176 
177       this.scan = Bytes.toString(Bytes.readByteArray(in));
178       this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
179     }
180   }
181 
182   /**
183    * Implementation class for RecordReader logic common between mapred and mapreduce.
184    */
185   public static class RecordReader {
186     private InputSplit split;
187     private Scan scan;
188     private Result result = null;
189     private ImmutableBytesWritable row = null;
190     private ClientSideRegionScanner scanner;
191 
192     public ClientSideRegionScanner getScanner() {
193       return scanner;
194     }
195 
196     public void initialize(InputSplit split, Configuration conf) throws IOException {
197       this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
198       this.split = split;
199       HTableDescriptor htd = split.htd;
200       HRegionInfo hri = this.split.getRegionInfo();
201       FileSystem fs = FSUtils.getCurrentFileSystem(conf);
202 
203 
204       // region is immutable, this should be fine,
205       // otherwise we have to set the thread read point
206       scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
207       // disable caching of data blocks
208       scan.setCacheBlocks(false);
209 
210       scanner =
211           new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
212     }
213 
214     public boolean nextKeyValue() throws IOException {
215       result = scanner.next();
216       if (result == null) {
217         //we are done
218         return false;
219       }
220 
221       if (this.row == null) {
222         this.row = new ImmutableBytesWritable();
223       }
224       this.row.set(result.getRow());
225       return true;
226     }
227 
228     public ImmutableBytesWritable getCurrentKey() {
229       return row;
230     }
231 
232     public Result getCurrentValue() {
233       return result;
234     }
235 
236     public long getPos() {
237       return 0;
238     }
239 
240     public float getProgress() {
241       return 0; // TODO: use total bytes to estimate
242     }
243 
244     public void close() {
245       if (this.scanner != null) {
246         this.scanner.close();
247       }
248     }
249   }
250 
251   public static List<InputSplit> getSplits(Configuration conf) throws IOException {
252     String snapshotName = getSnapshotName(conf);
253 
254     Path rootDir = FSUtils.getRootDir(conf);
255     FileSystem fs = rootDir.getFileSystem(conf);
256 
257     SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
258 
259     List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
260 
261     // TODO: mapred does not support scan as input API. Work around for now.
262     Scan scan = extractScanFromConf(conf);
263     // the temp dir where the snapshot is restored
264     Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
265 
266     return getSplits(scan, manifest, regionInfos, restoreDir, conf);
267   }
268 
269   public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
270     List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
271     if (regionManifests == null) {
272       throw new IllegalArgumentException("Snapshot seems empty");
273     }
274 
275     List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
276 
277     for (SnapshotRegionManifest regionManifest : regionManifests) {
278       regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
279     }
280     return regionInfos;
281   }
282 
283   public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
284       Path rootDir, FileSystem fs) throws IOException {
285     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
286     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
287     return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
288   }
289 
290   public static Scan extractScanFromConf(Configuration conf) throws IOException {
291     Scan scan = null;
292     if (conf.get(TableInputFormat.SCAN) != null) {
293       scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
294     } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
295       String[] columns =
296         conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
297       scan = new Scan();
298       for (String col : columns) {
299         scan.addFamily(Bytes.toBytes(col));
300       }
301     } else {
302       throw new IllegalArgumentException("Unable to create scan");
303     }
304     return scan;
305   }
306 
307   public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
308       List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
309     // load table descriptor
310     HTableDescriptor htd = manifest.getTableDescriptor();
311 
312     Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
313 
314     List<InputSplit> splits = new ArrayList<InputSplit>();
315     for (HRegionInfo hri : regionManifests) {
316       // load region descriptor
317 
318       if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
319           hri.getEndKey())) {
320         // compute HDFS locations from snapshot files (which will get the locations for
321         // referred hfiles)
322         List<String> hosts = getBestLocations(conf,
323             HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
324 
325         int len = Math.min(3, hosts.size());
326         hosts = hosts.subList(0, len);
327         splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
328       }
329     }
330 
331     return splits;
332 
333   }
334 
335   /**
336    * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
337    * weights into account, thus will treat every location passed from the input split as equal. We
338    * do not want to blindly pass all the locations, since we are creating one split per region, and
339    * the region's blocks are all distributed throughout the cluster unless favorite node assignment
340    * is used. On the expected stable case, only one location will contain most of the blocks as
341    * local.
342    * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
343    * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
344    * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
345    * host with the best locality.
346    */
347   public static List<String> getBestLocations(
348       Configuration conf, HDFSBlocksDistribution blockDistribution) {
349     List<String> locations = new ArrayList<String>(3);
350 
351     HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
352 
353     if (hostAndWeights.length == 0) {
354       return locations;
355     }
356 
357     HostAndWeight topHost = hostAndWeights[0];
358     locations.add(topHost.getHost());
359 
360     // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
361     double cutoffMultiplier
362       = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
363 
364     double filterWeight = topHost.getWeight() * cutoffMultiplier;
365 
366     for (int i = 1; i < hostAndWeights.length; i++) {
367       if (hostAndWeights[i].getWeight() >= filterWeight) {
368         locations.add(hostAndWeights[i].getHost());
369       } else {
370         break;
371       }
372     }
373 
374     return locations;
375   }
376 
377   private static String getSnapshotName(Configuration conf) {
378     String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
379     if (snapshotName == null) {
380       throw new IllegalArgumentException("Snapshot name must be provided");
381     }
382     return snapshotName;
383   }
384 
385   /**
386    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
387    * @param conf the job to configuration
388    * @param snapshotName the name of the snapshot to read from
389    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
390    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
391    * After the job is finished, restoreDir can be deleted.
392    * @throws IOException if an error occurs
393    */
394   public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
395       throws IOException {
396     conf.set(SNAPSHOT_NAME_KEY, snapshotName);
397 
398     Path rootDir = FSUtils.getRootDir(conf);
399     FileSystem fs = rootDir.getFileSystem(conf);
400 
401     restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
402 
403     // TODO: restore from record readers to parallelize.
404     RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
405 
406     conf.set(RESTORE_DIR_KEY, restoreDir.toString());
407   }
408 }