1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import com.google.common.collect.Lists;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
29 import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.HTableDescriptor;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
35 import org.apache.hadoop.hbase.client.IsolationLevel;
36 import org.apache.hadoop.hbase.client.Result;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
40 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
41 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
42 import org.apache.hadoop.hbase.regionserver.HRegion;
43 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
44 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
45 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.FSUtils;
48 import org.apache.hadoop.io.Writable;
49
50 import java.io.ByteArrayOutputStream;
51 import java.io.DataInput;
52 import java.io.DataOutput;
53 import java.io.IOException;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.UUID;
57
58
59
60
61 @InterfaceAudience.Private
62 @InterfaceStability.Evolving
63 public class TableSnapshotInputFormatImpl {
64
65
66
67 public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
68
69 private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
70
71 protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
72
73
74 private static final String LOCALITY_CUTOFF_MULTIPLIER =
75 "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
76 private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
77
78
79
80
81 public static class InputSplit implements Writable {
82
83 private HTableDescriptor htd;
84 private HRegionInfo regionInfo;
85 private String[] locations;
86 private String scan;
87 private String restoreDir;
88
89
90 public InputSplit() {}
91
92 public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
93 Scan scan, Path restoreDir) {
94 this.htd = htd;
95 this.regionInfo = regionInfo;
96 if (locations == null || locations.isEmpty()) {
97 this.locations = new String[0];
98 } else {
99 this.locations = locations.toArray(new String[locations.size()]);
100 }
101 try {
102 this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
103 } catch (IOException e) {
104 LOG.warn("Failed to convert Scan to String", e);
105 }
106
107 this.restoreDir = restoreDir.toString();
108 }
109
110 public HTableDescriptor getHtd() {
111 return htd;
112 }
113
114 public String getScan() {
115 return scan;
116 }
117
118 public String getRestoreDir() {
119 return restoreDir;
120 }
121
122 public long getLength() {
123
124 return 0;
125 }
126
127 public String[] getLocations() {
128 return locations;
129 }
130
131 public HTableDescriptor getTableDescriptor() {
132 return htd;
133 }
134
135 public HRegionInfo getRegionInfo() {
136 return regionInfo;
137 }
138
139
140
141 @Override
142 public void write(DataOutput out) throws IOException {
143 TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
144 .setTable(htd.convert())
145 .setRegion(HRegionInfo.convert(regionInfo));
146
147 for (String location : locations) {
148 builder.addLocations(location);
149 }
150
151 TableSnapshotRegionSplit split = builder.build();
152
153 ByteArrayOutputStream baos = new ByteArrayOutputStream();
154 split.writeTo(baos);
155 baos.close();
156 byte[] buf = baos.toByteArray();
157 out.writeInt(buf.length);
158 out.write(buf);
159
160 Bytes.writeByteArray(out, Bytes.toBytes(scan));
161 Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
162
163 }
164
165 @Override
166 public void readFields(DataInput in) throws IOException {
167 int len = in.readInt();
168 byte[] buf = new byte[len];
169 in.readFully(buf);
170 TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
171 this.htd = HTableDescriptor.convert(split.getTable());
172 this.regionInfo = HRegionInfo.convert(split.getRegion());
173 List<String> locationsList = split.getLocationsList();
174 this.locations = locationsList.toArray(new String[locationsList.size()]);
175
176 this.scan = Bytes.toString(Bytes.readByteArray(in));
177 this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
178 }
179 }
180
181
182
183
184 public static class RecordReader {
185 private InputSplit split;
186 private Scan scan;
187 private Result result = null;
188 private ImmutableBytesWritable row = null;
189 private ClientSideRegionScanner scanner;
190
191 public ClientSideRegionScanner getScanner() {
192 return scanner;
193 }
194
195 public void initialize(InputSplit split, Configuration conf) throws IOException {
196 this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
197 this.split = split;
198 HTableDescriptor htd = split.htd;
199 HRegionInfo hri = this.split.getRegionInfo();
200 FileSystem fs = FSUtils.getCurrentFileSystem(conf);
201
202
203
204
205 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
206
207 scan.setCacheBlocks(false);
208
209 scanner =
210 new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
211 }
212
213 public boolean nextKeyValue() throws IOException {
214 result = scanner.next();
215 if (result == null) {
216
217 return false;
218 }
219
220 if (this.row == null) {
221 this.row = new ImmutableBytesWritable();
222 }
223 this.row.set(result.getRow());
224 return true;
225 }
226
227 public ImmutableBytesWritable getCurrentKey() {
228 return row;
229 }
230
231 public Result getCurrentValue() {
232 return result;
233 }
234
235 public long getPos() {
236 return 0;
237 }
238
239 public float getProgress() {
240 return 0;
241 }
242
243 public void close() {
244 if (this.scanner != null) {
245 this.scanner.close();
246 }
247 }
248 }
249
250 public static List<InputSplit> getSplits(Configuration conf) throws IOException {
251 String snapshotName = getSnapshotName(conf);
252
253 Path rootDir = FSUtils.getRootDir(conf);
254 FileSystem fs = rootDir.getFileSystem(conf);
255
256 SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
257
258 List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
259
260
261 Scan scan = extractScanFromConf(conf);
262
263 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
264
265 return getSplits(scan, manifest, regionInfos, restoreDir, conf);
266 }
267
268 public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
269 List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
270 if (regionManifests == null) {
271 throw new IllegalArgumentException("Snapshot seems empty");
272 }
273
274 List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
275
276 for (SnapshotRegionManifest regionManifest : regionManifests) {
277 regionInfos.add(HRegionInfo.convert(regionManifest.getRegionInfo()));
278 }
279 return regionInfos;
280 }
281
282 public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
283 Path rootDir, FileSystem fs) throws IOException {
284 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
285 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
286 return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
287 }
288
289 public static Scan extractScanFromConf(Configuration conf) throws IOException {
290 Scan scan = null;
291 if (conf.get(TableInputFormat.SCAN) != null) {
292 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
293 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
294 String[] columns =
295 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
296 scan = new Scan();
297 for (String col : columns) {
298 scan.addFamily(Bytes.toBytes(col));
299 }
300 } else {
301 throw new IllegalArgumentException("Unable to create scan");
302 }
303 return scan;
304 }
305
306 public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
307 List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
308
309 HTableDescriptor htd = manifest.getTableDescriptor();
310
311 Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
312
313 List<InputSplit> splits = new ArrayList<InputSplit>();
314 for (HRegionInfo hri : regionManifests) {
315
316 if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
317 continue;
318 }
319
320 if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
321 hri.getEndKey())) {
322
323
324 List<String> hosts = getBestLocations(conf,
325 HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
326
327 int len = Math.min(3, hosts.size());
328 hosts = hosts.subList(0, len);
329 splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
330 }
331 }
332
333 return splits;
334
335 }
336
337
338
339
340
341
342
343
344
345
346
347
348
349 public static List<String> getBestLocations(
350 Configuration conf, HDFSBlocksDistribution blockDistribution) {
351 List<String> locations = new ArrayList<String>(3);
352
353 HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
354
355 if (hostAndWeights.length == 0) {
356 return locations;
357 }
358
359 HostAndWeight topHost = hostAndWeights[0];
360 locations.add(topHost.getHost());
361
362
363 double cutoffMultiplier
364 = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
365
366 double filterWeight = topHost.getWeight() * cutoffMultiplier;
367
368 for (int i = 1; i < hostAndWeights.length; i++) {
369 if (hostAndWeights[i].getWeight() >= filterWeight) {
370 locations.add(hostAndWeights[i].getHost());
371 } else {
372 break;
373 }
374 }
375
376 return locations;
377 }
378
379 private static String getSnapshotName(Configuration conf) {
380 String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
381 if (snapshotName == null) {
382 throw new IllegalArgumentException("Snapshot name must be provided");
383 }
384 return snapshotName;
385 }
386
387
388
389
390
391
392
393
394
395
396 public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
397 throws IOException {
398 conf.set(SNAPSHOT_NAME_KEY, snapshotName);
399
400 Path rootDir = FSUtils.getRootDir(conf);
401 FileSystem fs = rootDir.getFileSystem(conf);
402
403 restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
404
405
406 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
407
408 conf.set(RESTORE_DIR_KEY, restoreDir.toString());
409 }
410 }