1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import org.apache.hadoop.hbase.classification.InterfaceAudience;
22 import org.apache.hadoop.hbase.classification.InterfaceStability;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.FileSystem;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
28 import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
32 import org.apache.hadoop.hbase.client.IsolationLevel;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
37 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
38 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
39 import org.apache.hadoop.hbase.regionserver.HRegion;
40 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
41 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
42 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.FSUtils;
45 import org.apache.hadoop.io.Writable;
46
47 import java.io.ByteArrayOutputStream;
48 import java.io.DataInput;
49 import java.io.DataOutput;
50 import java.io.IOException;
51 import java.util.ArrayList;
52 import java.util.List;
53 import java.util.UUID;
54
55
56
57
58 @InterfaceAudience.Private
59 @InterfaceStability.Evolving
60 public class TableSnapshotInputFormatImpl {
61
62
63
64 private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
65
66 private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
67
68
69 private static final String LOCALITY_CUTOFF_MULTIPLIER =
70 "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
71 private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
72
73
74
75
76 public static class InputSplit implements Writable {
77 private HTableDescriptor htd;
78 private HRegionInfo regionInfo;
79 private String[] locations;
80
81
82 public InputSplit() {}
83
84 public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
85 this.htd = htd;
86 this.regionInfo = regionInfo;
87 if (locations == null || locations.isEmpty()) {
88 this.locations = new String[0];
89 } else {
90 this.locations = locations.toArray(new String[locations.size()]);
91 }
92 }
93
94 public long getLength() {
95
96 return 0;
97 }
98
99 public String[] getLocations() {
100 return locations;
101 }
102
103 public HTableDescriptor getTableDescriptor() {
104 return htd;
105 }
106
107 public HRegionInfo getRegionInfo() {
108 return regionInfo;
109 }
110
111
112
113 @Override
114 public void write(DataOutput out) throws IOException {
115 TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
116 .setTable(htd.convert())
117 .setRegion(HRegionInfo.convert(regionInfo));
118
119 for (String location : locations) {
120 builder.addLocations(location);
121 }
122
123 TableSnapshotRegionSplit split = builder.build();
124
125 ByteArrayOutputStream baos = new ByteArrayOutputStream();
126 split.writeTo(baos);
127 baos.close();
128 byte[] buf = baos.toByteArray();
129 out.writeInt(buf.length);
130 out.write(buf);
131 }
132
133 @Override
134 public void readFields(DataInput in) throws IOException {
135 int len = in.readInt();
136 byte[] buf = new byte[len];
137 in.readFully(buf);
138 TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
139 this.htd = HTableDescriptor.convert(split.getTable());
140 this.regionInfo = HRegionInfo.convert(split.getRegion());
141 List<String> locationsList = split.getLocationsList();
142 this.locations = locationsList.toArray(new String[locationsList.size()]);
143 }
144 }
145
146
147
148
149 public static class RecordReader {
150 private InputSplit split;
151 private Scan scan;
152 private Result result = null;
153 private ImmutableBytesWritable row = null;
154 private ClientSideRegionScanner scanner;
155
156 public ClientSideRegionScanner getScanner() {
157 return scanner;
158 }
159
160 public void initialize(InputSplit split, Configuration conf) throws IOException {
161 this.split = split;
162 HTableDescriptor htd = split.htd;
163 HRegionInfo hri = this.split.getRegionInfo();
164 FileSystem fs = FSUtils.getCurrentFileSystem(conf);
165
166 Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY));
167
168
169
170
171 if (conf.get(TableInputFormat.SCAN) != null) {
172 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
173 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
174 String[] columns =
175 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
176 scan = new Scan();
177 for (String col : columns) {
178 scan.addFamily(Bytes.toBytes(col));
179 }
180 } else {
181 throw new IllegalArgumentException("A Scan is not configured for this job");
182 }
183
184
185
186 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
187
188 scan.setCacheBlocks(false);
189
190 scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
191 }
192
193 public boolean nextKeyValue() throws IOException {
194 result = scanner.next();
195 if (result == null) {
196
197 return false;
198 }
199
200 if (this.row == null) {
201 this.row = new ImmutableBytesWritable();
202 }
203 this.row.set(result.getRow());
204 return true;
205 }
206
207 public ImmutableBytesWritable getCurrentKey() {
208 return row;
209 }
210
211 public Result getCurrentValue() {
212 return result;
213 }
214
215 public long getPos() {
216 return 0;
217 }
218
219 public float getProgress() {
220 return 0;
221 }
222
223 public void close() {
224 if (this.scanner != null) {
225 this.scanner.close();
226 }
227 }
228 }
229
230 public static List<InputSplit> getSplits(Configuration conf) throws IOException {
231 String snapshotName = getSnapshotName(conf);
232
233 Path rootDir = FSUtils.getRootDir(conf);
234 FileSystem fs = rootDir.getFileSystem(conf);
235
236 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
237 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
238 SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
239 List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
240 if (regionManifests == null) {
241 throw new IllegalArgumentException("Snapshot seems empty");
242 }
243
244
245 HTableDescriptor htd = manifest.getTableDescriptor();
246
247
248 Scan scan = null;
249 if (conf.get(TableInputFormat.SCAN) != null) {
250 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
251 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
252 String[] columns =
253 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
254 scan = new Scan();
255 for (String col : columns) {
256 scan.addFamily(Bytes.toBytes(col));
257 }
258 } else {
259 throw new IllegalArgumentException("Unable to create scan");
260 }
261
262 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
263 Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
264
265 List<InputSplit> splits = new ArrayList<InputSplit>();
266 for (SnapshotRegionManifest regionManifest : regionManifests) {
267
268 HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
269
270 if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
271 hri.getStartKey(), hri.getEndKey())) {
272
273
274 List<String> hosts = getBestLocations(conf,
275 HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
276
277 int len = Math.min(3, hosts.size());
278 hosts = hosts.subList(0, len);
279 splits.add(new InputSplit(htd, hri, hosts));
280 }
281 }
282
283 return splits;
284 }
285
286
287
288
289
290
291
292
293
294
295
296
297
298 public static List<String> getBestLocations(
299 Configuration conf, HDFSBlocksDistribution blockDistribution) {
300 List<String> locations = new ArrayList<String>(3);
301
302 HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
303
304 if (hostAndWeights.length == 0) {
305 return locations;
306 }
307
308 HostAndWeight topHost = hostAndWeights[0];
309 locations.add(topHost.getHost());
310
311
312 double cutoffMultiplier
313 = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
314
315 double filterWeight = topHost.getWeight() * cutoffMultiplier;
316
317 for (int i = 1; i < hostAndWeights.length; i++) {
318 if (hostAndWeights[i].getWeight() >= filterWeight) {
319 locations.add(hostAndWeights[i].getHost());
320 } else {
321 break;
322 }
323 }
324
325 return locations;
326 }
327
328 private static String getSnapshotName(Configuration conf) {
329 String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
330 if (snapshotName == null) {
331 throw new IllegalArgumentException("Snapshot name must be provided");
332 }
333 return snapshotName;
334 }
335
336
337
338
339
340
341
342
343
344
345 public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
346 throws IOException {
347 conf.set(SNAPSHOT_NAME_KEY, snapshotName);
348
349 Path rootDir = FSUtils.getRootDir(conf);
350 FileSystem fs = rootDir.getFileSystem(conf);
351
352 restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
353
354
355 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
356
357 conf.set(RESTORE_DIR_KEY, restoreDir.toString());
358 }
359 }