View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import com.google.common.collect.Lists;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.CellUtil;
28  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
29  import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
35  import org.apache.hadoop.hbase.client.IsolationLevel;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
40  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
41  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
42  import org.apache.hadoop.hbase.regionserver.HRegion;
43  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
44  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
45  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.FSUtils;
48  import org.apache.hadoop.io.Writable;
49  
50  import java.io.ByteArrayOutputStream;
51  import java.io.DataInput;
52  import java.io.DataOutput;
53  import java.io.IOException;
54  import java.util.ArrayList;
55  import java.util.List;
56  import java.util.UUID;
57  
58  /**
59   * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
60   */
61  @InterfaceAudience.Private
62  @InterfaceStability.Evolving
63  public class TableSnapshotInputFormatImpl {
64    // TODO: Snapshots files are owned in fs by the hbase user. There is no
65    // easy way to delegate access.
66  
67    public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
68  
69    private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
70    // key for specifying the root dir of the restored snapshot
71    protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
72  
73    /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
74    private static final String LOCALITY_CUTOFF_MULTIPLIER =
75      "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
76    private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
77  
78    /**
79     * Implementation class for InputSplit logic common between mapred and mapreduce.
80     */
81    public static class InputSplit implements Writable {
82  
83      private HTableDescriptor htd;
84      private HRegionInfo regionInfo;
85      private String[] locations;
86      private String scan;
87      private String restoreDir;
88  
89      // constructor for mapreduce framework / Writable
90      public InputSplit() {}
91  
92      public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
93          Scan scan, Path restoreDir) {
94        this.htd = htd;
95        this.regionInfo = regionInfo;
96        if (locations == null || locations.isEmpty()) {
97          this.locations = new String[0];
98        } else {
99          this.locations = locations.toArray(new String[locations.size()]);
100       }
101       try {
102         this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
103       } catch (IOException e) {
104         LOG.warn("Failed to convert Scan to String", e);
105       }
106 
107       this.restoreDir = restoreDir.toString();
108     }
109 
110     public HTableDescriptor getHtd() {
111       return htd;
112     }
113 
114     public String getScan() {
115       return scan;
116     }
117 
118     public String getRestoreDir() {
119       return restoreDir;
120     }
121 
122     public long getLength() {
123       //TODO: We can obtain the file sizes of the snapshot here.
124       return 0;
125     }
126 
127     public String[] getLocations() {
128       return locations;
129     }
130 
131     public HTableDescriptor getTableDescriptor() {
132       return htd;
133     }
134 
135     public HRegionInfo getRegionInfo() {
136       return regionInfo;
137     }
138 
139     // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
140     // doing this wrapping with Writables.
141     @Override
142     public void write(DataOutput out) throws IOException {
143       TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
144           .setTable(htd.convert())
145           .setRegion(HRegionInfo.convert(regionInfo));
146 
147       for (String location : locations) {
148         builder.addLocations(location);
149       }
150 
151       TableSnapshotRegionSplit split = builder.build();
152 
153       ByteArrayOutputStream baos = new ByteArrayOutputStream();
154       split.writeTo(baos);
155       baos.close();
156       byte[] buf = baos.toByteArray();
157       out.writeInt(buf.length);
158       out.write(buf);
159 
160       Bytes.writeByteArray(out, Bytes.toBytes(scan));
161       Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
162 
163     }
164 
165     @Override
166     public void readFields(DataInput in) throws IOException {
167       int len = in.readInt();
168       byte[] buf = new byte[len];
169       in.readFully(buf);
170       TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
171       this.htd = HTableDescriptor.convert(split.getTable());
172       this.regionInfo = HRegionInfo.convert(split.getRegion());
173       List<String> locationsList = split.getLocationsList();
174       this.locations = locationsList.toArray(new String[locationsList.size()]);
175 
176       this.scan = Bytes.toString(Bytes.readByteArray(in));
177       this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
178     }
179   }
180 
181   /**
182    * Implementation class for RecordReader logic common between mapred and mapreduce.
183    */
184   public static class RecordReader {
185     private InputSplit split;
186     private Scan scan;
187     private Result result = null;
188     private ImmutableBytesWritable row = null;
189     private ClientSideRegionScanner scanner;
190 
191     public ClientSideRegionScanner getScanner() {
192       return scanner;
193     }
194 
195     public void initialize(InputSplit split, Configuration conf) throws IOException {
196       this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
197       this.split = split;
198       HTableDescriptor htd = split.htd;
199       HRegionInfo hri = this.split.getRegionInfo();
200       FileSystem fs = FSUtils.getCurrentFileSystem(conf);
201 
202 
203       // region is immutable, this should be fine,
204       // otherwise we have to set the thread read point
205       scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
206       // disable caching of data blocks
207       scan.setCacheBlocks(false);
208 
209       scanner =
210           new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
211     }
212 
213     public boolean nextKeyValue() throws IOException {
214       result = scanner.next();
215       if (result == null) {
216         //we are done
217         return false;
218       }
219 
220       if (this.row == null) {
221         this.row = new ImmutableBytesWritable();
222       }
223       this.row.set(result.getRow());
224       return true;
225     }
226 
227     public ImmutableBytesWritable getCurrentKey() {
228       return row;
229     }
230 
231     public Result getCurrentValue() {
232       return result;
233     }
234 
235     public long getPos() {
236       return 0;
237     }
238 
239     public float getProgress() {
240       return 0; // TODO: use total bytes to estimate
241     }
242 
243     public void close() {
244       if (this.scanner != null) {
245         this.scanner.close();
246       }
247     }
248   }
249 
250   public static List<InputSplit> getSplits(Configuration conf) throws IOException {
251     String snapshotName = getSnapshotName(conf);
252 
253     Path rootDir = FSUtils.getRootDir(conf);
254     FileSystem fs = rootDir.getFileSystem(conf);
255 
256     SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
257 
258     List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
259 
260     // TODO: mapred does not support scan as input API. Work around for now.
261     Scan scan = extractScanFromConf(conf);
262     // the temp dir where the snapshot is restored
263     Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
264 
265     return getSplits(scan, manifest, regionInfos, restoreDir, conf);
266   }
267 
268   public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
269     List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
270     if (regionManifests == null) {
271       throw new IllegalArgumentException("Snapshot seems empty");
272     }
273 
274     List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
275 
276     for (SnapshotRegionManifest regionManifest : regionManifests) {
277       regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
278     }
279     return regionInfos;
280   }
281 
282   public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
283       Path rootDir, FileSystem fs) throws IOException {
284     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
285     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
286     return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
287   }
288 
289   public static Scan extractScanFromConf(Configuration conf) throws IOException {
290     Scan scan = null;
291     if (conf.get(TableInputFormat.SCAN) != null) {
292       scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
293     } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
294       String[] columns =
295         conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
296       scan = new Scan();
297       for (String col : columns) {
298         scan.addFamily(Bytes.toBytes(col));
299       }
300     } else {
301       throw new IllegalArgumentException("Unable to create scan");
302     }
303     return scan;
304   }
305 
306   public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
307       List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
308     // load table descriptor
309     HTableDescriptor htd = manifest.getTableDescriptor();
310 
311     Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
312 
313     List<InputSplit> splits = new ArrayList<InputSplit>();
314     for (HRegionInfo hri : regionManifests) {
315       // load region descriptor
316       if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
317         continue;
318       }
319 
320       if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
321           hri.getEndKey())) {
322         // compute HDFS locations from snapshot files (which will get the locations for
323         // referred hfiles)
324         List<String> hosts = getBestLocations(conf,
325             HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
326 
327         int len = Math.min(3, hosts.size());
328         hosts = hosts.subList(0, len);
329         splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
330       }
331     }
332 
333     return splits;
334 
335   }
336 
337   /**
338    * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
339    * weights into account, thus will treat every location passed from the input split as equal. We
340    * do not want to blindly pass all the locations, since we are creating one split per region, and
341    * the region's blocks are all distributed throughout the cluster unless favorite node assignment
342    * is used. On the expected stable case, only one location will contain most of the blocks as
343    * local.
344    * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
345    * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
346    * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
347    * host with the best locality.
348    */
349   public static List<String> getBestLocations(
350       Configuration conf, HDFSBlocksDistribution blockDistribution) {
351     List<String> locations = new ArrayList<String>(3);
352 
353     HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
354 
355     if (hostAndWeights.length == 0) {
356       return locations;
357     }
358 
359     HostAndWeight topHost = hostAndWeights[0];
360     locations.add(topHost.getHost());
361 
362     // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
363     double cutoffMultiplier
364       = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
365 
366     double filterWeight = topHost.getWeight() * cutoffMultiplier;
367 
368     for (int i = 1; i < hostAndWeights.length; i++) {
369       if (hostAndWeights[i].getWeight() >= filterWeight) {
370         locations.add(hostAndWeights[i].getHost());
371       } else {
372         break;
373       }
374     }
375 
376     return locations;
377   }
378 
379   private static String getSnapshotName(Configuration conf) {
380     String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
381     if (snapshotName == null) {
382       throw new IllegalArgumentException("Snapshot name must be provided");
383     }
384     return snapshotName;
385   }
386 
387   /**
388    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
389    * @param conf the job to configure
390    * @param snapshotName the name of the snapshot to read from
391    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
392    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
393    * After the job is finished, restoreDir can be deleted.
394    * @throws IOException if an error occurs
395    */
396   public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
397       throws IOException {
398     conf.set(SNAPSHOT_NAME_KEY, snapshotName);
399 
400     Path rootDir = FSUtils.getRootDir(conf);
401     FileSystem fs = rootDir.getFileSystem(conf);
402 
403     restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
404 
405     // TODO: restore from record readers to parallelize.
406     RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
407 
408     conf.set(RESTORE_DIR_KEY, restoreDir.toString());
409   }
410 }