001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.ByteArrayOutputStream; 022import java.io.DataInput; 023import java.io.DataOutput; 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.UUID; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HDFSBlocksDistribution; 033import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; 034import org.apache.hadoop.hbase.HRegionInfo; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.client.ClientSideRegionScanner; 037import org.apache.hadoop.hbase.client.IsolationLevel; 038import org.apache.hadoop.hbase.client.Result; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 044import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 045import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.CommonFSUtils; 048import org.apache.hadoop.hbase.util.RegionSplitter; 049import org.apache.hadoop.io.Writable; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 060 061/** 062 * Hadoop MR API-agnostic implementation for mapreduce over table snapshots. 063 */ 064@InterfaceAudience.Private 065public class TableSnapshotInputFormatImpl { 066 // TODO: Snapshots files are owned in fs by the hbase user. There is no 067 // easy way to delegate access. 068 069 public static final Logger LOG = LoggerFactory.getLogger(TableSnapshotInputFormatImpl.class); 070 071 private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; 072 // key for specifying the root dir of the restored snapshot 073 protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; 074 075 /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */ 076 private static final String LOCALITY_CUTOFF_MULTIPLIER = 077 "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; 078 private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; 079 080 /** 081 * For MapReduce jobs running multiple mappers per region, determines 082 * what split algorithm we should be using to find split points for scanners. 083 */ 084 public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm"; 085 /** 086 * For MapReduce jobs running multiple mappers per region, determines 087 * number of splits to generate per region. 088 */ 089 public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region"; 090 091 /** 092 * Whether to calculate the block location for splits. Default to true. 093 * If the computing layer runs outside of HBase cluster, the block locality does not master. 094 * Setting this value to false could skip the calculation and save some time. 095 * 096 * Set access modifier to "public" so that these could be accessed by test classes of 097 * both org.apache.hadoop.hbase.mapred 098 * and org.apache.hadoop.hbase.mapreduce. 099 */ 100 public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY = 101 "hbase.TableSnapshotInputFormat.locality.enabled"; 102 public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; 103 104 /** 105 * In some scenario, scan limited rows on each InputSplit for sampling data extraction 106 */ 107 public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT = 108 "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit"; 109 110 /** 111 * Implementation class for InputSplit logic common between mapred and mapreduce. 112 */ 113 public static class InputSplit implements Writable { 114 115 private TableDescriptor htd; 116 private HRegionInfo regionInfo; 117 private String[] locations; 118 private String scan; 119 private String restoreDir; 120 121 // constructor for mapreduce framework / Writable 122 public InputSplit() {} 123 124 public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations, 125 Scan scan, Path restoreDir) { 126 this.htd = htd; 127 this.regionInfo = regionInfo; 128 if (locations == null || locations.isEmpty()) { 129 this.locations = new String[0]; 130 } else { 131 this.locations = locations.toArray(new String[locations.size()]); 132 } 133 try { 134 this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; 135 } catch (IOException e) { 136 LOG.warn("Failed to convert Scan to String", e); 137 } 138 139 this.restoreDir = restoreDir.toString(); 140 } 141 142 public TableDescriptor getHtd() { 143 return htd; 144 } 145 146 public String getScan() { 147 return scan; 148 } 149 150 public String getRestoreDir() { 151 return restoreDir; 152 } 153 154 public long getLength() { 155 //TODO: We can obtain the file sizes of the snapshot here. 156 return 0; 157 } 158 159 public String[] getLocations() { 160 return locations; 161 } 162 163 public TableDescriptor getTableDescriptor() { 164 return htd; 165 } 166 167 public HRegionInfo getRegionInfo() { 168 return regionInfo; 169 } 170 171 // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of 172 // doing this wrapping with Writables. 173 @Override 174 public void write(DataOutput out) throws IOException { 175 TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder() 176 .setTable(ProtobufUtil.toTableSchema(htd)) 177 .setRegion(HRegionInfo.convert(regionInfo)); 178 179 for (String location : locations) { 180 builder.addLocations(location); 181 } 182 183 TableSnapshotRegionSplit split = builder.build(); 184 185 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 186 split.writeTo(baos); 187 baos.close(); 188 byte[] buf = baos.toByteArray(); 189 out.writeInt(buf.length); 190 out.write(buf); 191 192 Bytes.writeByteArray(out, Bytes.toBytes(scan)); 193 Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); 194 195 } 196 197 @Override 198 public void readFields(DataInput in) throws IOException { 199 int len = in.readInt(); 200 byte[] buf = new byte[len]; 201 in.readFully(buf); 202 TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf); 203 this.htd = ProtobufUtil.toTableDescriptor(split.getTable()); 204 this.regionInfo = HRegionInfo.convert(split.getRegion()); 205 List<String> locationsList = split.getLocationsList(); 206 this.locations = locationsList.toArray(new String[locationsList.size()]); 207 208 this.scan = Bytes.toString(Bytes.readByteArray(in)); 209 this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); 210 } 211 } 212 213 /** 214 * Implementation class for RecordReader logic common between mapred and mapreduce. 215 */ 216 public static class RecordReader { 217 private InputSplit split; 218 private Scan scan; 219 private Result result = null; 220 private ImmutableBytesWritable row = null; 221 private ClientSideRegionScanner scanner; 222 private int numOfCompleteRows = 0; 223 private int rowLimitPerSplit; 224 225 public ClientSideRegionScanner getScanner() { 226 return scanner; 227 } 228 229 public void initialize(InputSplit split, Configuration conf) throws IOException { 230 this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); 231 this.split = split; 232 this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0); 233 TableDescriptor htd = split.htd; 234 HRegionInfo hri = this.split.getRegionInfo(); 235 FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf); 236 237 238 // region is immutable, this should be fine, 239 // otherwise we have to set the thread read point 240 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 241 // disable caching of data blocks 242 scan.setCacheBlocks(false); 243 scan.setScanMetricsEnabled(true); 244 245 scanner = 246 new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); 247 } 248 249 public boolean nextKeyValue() throws IOException { 250 result = scanner.next(); 251 if (result == null) { 252 //we are done 253 return false; 254 } 255 256 if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) { 257 return false; 258 } 259 if (this.row == null) { 260 this.row = new ImmutableBytesWritable(); 261 } 262 this.row.set(result.getRow()); 263 return true; 264 } 265 266 public ImmutableBytesWritable getCurrentKey() { 267 return row; 268 } 269 270 public Result getCurrentValue() { 271 return result; 272 } 273 274 public long getPos() { 275 return 0; 276 } 277 278 public float getProgress() { 279 return 0; // TODO: use total bytes to estimate 280 } 281 282 public void close() { 283 if (this.scanner != null) { 284 this.scanner.close(); 285 } 286 } 287 } 288 289 public static List<InputSplit> getSplits(Configuration conf) throws IOException { 290 String snapshotName = getSnapshotName(conf); 291 292 Path rootDir = CommonFSUtils.getRootDir(conf); 293 FileSystem fs = rootDir.getFileSystem(conf); 294 295 SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); 296 297 List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest); 298 299 // TODO: mapred does not support scan as input API. Work around for now. 300 Scan scan = extractScanFromConf(conf); 301 // the temp dir where the snapshot is restored 302 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); 303 304 RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf); 305 306 int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1); 307 308 return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits); 309 } 310 311 public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException { 312 String splitAlgoClassName = conf.get(SPLIT_ALGO); 313 if (splitAlgoClassName == null) { 314 return null; 315 } 316 try { 317 return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class) 318 .getDeclaredConstructor().newInstance(); 319 } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | 320 NoSuchMethodException | InvocationTargetException e) { 321 throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e); 322 } 323 } 324 325 326 public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) { 327 List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests(); 328 if (regionManifests == null) { 329 throw new IllegalArgumentException("Snapshot seems empty"); 330 } 331 332 List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); 333 334 for (SnapshotRegionManifest regionManifest : regionManifests) { 335 HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); 336 if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { 337 continue; 338 } 339 regionInfos.add(hri); 340 } 341 return regionInfos; 342 } 343 344 public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, 345 Path rootDir, FileSystem fs) throws IOException { 346 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); 347 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 348 return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); 349 } 350 351 public static Scan extractScanFromConf(Configuration conf) throws IOException { 352 Scan scan = null; 353 if (conf.get(TableInputFormat.SCAN) != null) { 354 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); 355 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { 356 String[] columns = 357 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); 358 scan = new Scan(); 359 for (String col : columns) { 360 scan.addFamily(Bytes.toBytes(col)); 361 } 362 } else { 363 throw new IllegalArgumentException("Unable to create scan"); 364 } 365 return scan; 366 } 367 368 public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, 369 List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException { 370 return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1); 371 } 372 373 public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, 374 List<HRegionInfo> regionManifests, Path restoreDir, 375 Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException { 376 // load table descriptor 377 TableDescriptor htd = manifest.getTableDescriptor(); 378 379 Path tableDir = CommonFSUtils.getTableDir(restoreDir, htd.getTableName()); 380 381 boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, 382 SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); 383 384 List<InputSplit> splits = new ArrayList<>(); 385 for (HRegionInfo hri : regionManifests) { 386 // load region descriptor 387 388 if (numSplits > 1) { 389 byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); 390 for (int i = 0; i < sp.length - 1; i++) { 391 if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], 392 sp[i + 1])) { 393 List<String> hosts = 394 calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); 395 396 Scan boundedScan = new Scan(scan); 397 if (scan.getStartRow().length == 0) { 398 boundedScan.withStartRow(sp[i]); 399 } else { 400 boundedScan.withStartRow( 401 Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]); 402 } 403 404 if (scan.getStopRow().length == 0) { 405 boundedScan.withStopRow(sp[i + 1]); 406 } else { 407 boundedScan.withStopRow( 408 Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); 409 } 410 411 splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); 412 } 413 } 414 } else { 415 if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), 416 hri.getStartKey(), hri.getEndKey())) { 417 List<String> hosts = 418 calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); 419 splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); 420 } 421 } 422 } 423 424 return splits; 425 } 426 427 /** 428 * Compute block locations for snapshot files (which will get the locations for referred hfiles) 429 * only when localityEnabled is true. 430 */ 431 private static List<String> calculateLocationsForInputSplit(Configuration conf, 432 TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled) 433 throws IOException { 434 if (localityEnabled) { // care block locality 435 return getBestLocations(conf, 436 HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); 437 } else { // do not care block locality 438 return null; 439 } 440 } 441 442 /** 443 * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take 444 * weights into account, thus will treat every location passed from the input split as equal. We 445 * do not want to blindly pass all the locations, since we are creating one split per region, and 446 * the region's blocks are all distributed throughout the cluster unless favorite node assignment 447 * is used. On the expected stable case, only one location will contain most of the blocks as 448 * local. 449 * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here 450 * we are doing a simple heuristic, where we will pass all hosts which have at least 80% 451 * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top 452 * host with the best locality. 453 * Return at most numTopsAtMost locations if there are more than that. 454 */ 455 private static List<String> getBestLocations(Configuration conf, 456 HDFSBlocksDistribution blockDistribution, int numTopsAtMost) { 457 HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); 458 459 if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is 460 return null; 461 } 462 463 if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1 464 numTopsAtMost = 1; 465 } 466 int top = Math.min(numTopsAtMost, hostAndWeights.length); 467 List<String> locations = new ArrayList<>(top); 468 HostAndWeight topHost = hostAndWeights[0]; 469 locations.add(topHost.getHost()); 470 471 if (top == 1) { // only care about the top host 472 return locations; 473 } 474 475 // When top >= 2, 476 // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality 477 double cutoffMultiplier 478 = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); 479 480 double filterWeight = topHost.getWeight() * cutoffMultiplier; 481 482 for (int i = 1; i <= top - 1; i++) { 483 if (hostAndWeights[i].getWeight() >= filterWeight) { 484 locations.add(hostAndWeights[i].getHost()); 485 } else { 486 // As hostAndWeights is in descending order, 487 // we could break the loop as long as we meet a weight which is less than filterWeight. 488 break; 489 } 490 } 491 492 return locations; 493 } 494 495 public static List<String> getBestLocations(Configuration conf, 496 HDFSBlocksDistribution blockDistribution) { 497 // 3 nodes will contain highly local blocks. So default to 3. 498 return getBestLocations(conf, blockDistribution, 3); 499 } 500 501 private static String getSnapshotName(Configuration conf) { 502 String snapshotName = conf.get(SNAPSHOT_NAME_KEY); 503 if (snapshotName == null) { 504 throw new IllegalArgumentException("Snapshot name must be provided"); 505 } 506 return snapshotName; 507 } 508 509 /** 510 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 511 * @param conf the job to configuration 512 * @param snapshotName the name of the snapshot to read from 513 * @param restoreDir a temporary directory to restore the snapshot into. Current user should have 514 * write permissions to this directory, and this should not be a subdirectory of rootdir. 515 * After the job is finished, restoreDir can be deleted. 516 * @throws IOException if an error occurs 517 */ 518 public static void setInput(Configuration conf, String snapshotName, Path restoreDir) 519 throws IOException { 520 setInput(conf, snapshotName, restoreDir, null, 1); 521 } 522 523 /** 524 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 525 * @param conf the job to configure 526 * @param snapshotName the name of the snapshot to read from 527 * @param restoreDir a temporary directory to restore the snapshot into. Current user should have 528 * write permissions to this directory, and this should not be a subdirectory of rootdir. 529 * After the job is finished, restoreDir can be deleted. 530 * @param numSplitsPerRegion how many input splits to generate per one region 531 * @param splitAlgo SplitAlgorithm to be used when generating InputSplits 532 * @throws IOException if an error occurs 533 */ 534 public static void setInput(Configuration conf, String snapshotName, Path restoreDir, 535 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) 536 throws IOException { 537 conf.set(SNAPSHOT_NAME_KEY, snapshotName); 538 if (numSplitsPerRegion < 1) { 539 throw new IllegalArgumentException("numSplits must be >= 1, " + 540 "illegal numSplits : " + numSplitsPerRegion); 541 } 542 if (splitAlgo == null && numSplitsPerRegion > 1) { 543 throw new IllegalArgumentException("Split algo can't be null when numSplits > 1"); 544 } 545 if (splitAlgo != null) { 546 conf.set(SPLIT_ALGO, splitAlgo.getClass().getName()); 547 } 548 conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion); 549 Path rootDir = CommonFSUtils.getRootDir(conf); 550 FileSystem fs = rootDir.getFileSystem(conf); 551 552 restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); 553 554 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 555 conf.set(RESTORE_DIR_KEY, restoreDir.toString()); 556 } 557}