001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.ByteArrayOutputStream; 022import java.io.DataInput; 023import java.io.DataOutput; 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.UUID; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HDFSBlocksDistribution; 034import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; 035import org.apache.hadoop.hbase.HRegionInfo; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.client.ClientSideRegionScanner; 038import org.apache.hadoop.hbase.client.IsolationLevel; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 045import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 046import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.FSUtils; 049import org.apache.hadoop.hbase.util.RegionSplitter; 050import org.apache.hadoop.io.Writable; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 060 061/** 062 * Hadoop MR API-agnostic implementation for mapreduce over table snapshots. 063 */ 064@InterfaceAudience.Private 065public class TableSnapshotInputFormatImpl { 066 // TODO: Snapshots files are owned in fs by the hbase user. There is no 067 // easy way to delegate access. 068 069 public static final Logger LOG = LoggerFactory.getLogger(TableSnapshotInputFormatImpl.class); 070 071 private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; 072 // key for specifying the root dir of the restored snapshot 073 protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; 074 075 /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */ 076 private static final String LOCALITY_CUTOFF_MULTIPLIER = 077 "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; 078 private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; 079 080 /** 081 * For MapReduce jobs running multiple mappers per region, determines 082 * what split algorithm we should be using to find split points for scanners. 083 */ 084 public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm"; 085 /** 086 * For MapReduce jobs running multiple mappers per region, determines 087 * number of splits to generate per region. 088 */ 089 public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region"; 090 091 /** 092 * Whether to calculate the block location for splits. Default to true. 093 * If the computing layer runs outside of HBase cluster, the block locality does not master. 094 * Setting this value to false could skip the calculation and save some time. 095 * 096 * Set access modifier to "public" so that these could be accessed by test classes of 097 * both org.apache.hadoop.hbase.mapred 098 * and org.apache.hadoop.hbase.mapreduce. 099 */ 100 public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY = 101 "hbase.TableSnapshotInputFormat.locality.enabled"; 102 public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; 103 104 /** 105 * Implementation class for InputSplit logic common between mapred and mapreduce. 106 */ 107 public static class InputSplit implements Writable { 108 109 private TableDescriptor htd; 110 private HRegionInfo regionInfo; 111 private String[] locations; 112 private String scan; 113 private String restoreDir; 114 115 // constructor for mapreduce framework / Writable 116 public InputSplit() {} 117 118 public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations, 119 Scan scan, Path restoreDir) { 120 this.htd = htd; 121 this.regionInfo = regionInfo; 122 if (locations == null || locations.isEmpty()) { 123 this.locations = new String[0]; 124 } else { 125 this.locations = locations.toArray(new String[locations.size()]); 126 } 127 try { 128 this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; 129 } catch (IOException e) { 130 LOG.warn("Failed to convert Scan to String", e); 131 } 132 133 this.restoreDir = restoreDir.toString(); 134 } 135 136 public TableDescriptor getHtd() { 137 return htd; 138 } 139 140 public String getScan() { 141 return scan; 142 } 143 144 public String getRestoreDir() { 145 return restoreDir; 146 } 147 148 public long getLength() { 149 //TODO: We can obtain the file sizes of the snapshot here. 150 return 0; 151 } 152 153 public String[] getLocations() { 154 return locations; 155 } 156 157 public TableDescriptor getTableDescriptor() { 158 return htd; 159 } 160 161 public HRegionInfo getRegionInfo() { 162 return regionInfo; 163 } 164 165 // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of 166 // doing this wrapping with Writables. 167 @Override 168 public void write(DataOutput out) throws IOException { 169 TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder() 170 .setTable(ProtobufUtil.toTableSchema(htd)) 171 .setRegion(HRegionInfo.convert(regionInfo)); 172 173 for (String location : locations) { 174 builder.addLocations(location); 175 } 176 177 TableSnapshotRegionSplit split = builder.build(); 178 179 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 180 split.writeTo(baos); 181 baos.close(); 182 byte[] buf = baos.toByteArray(); 183 out.writeInt(buf.length); 184 out.write(buf); 185 186 Bytes.writeByteArray(out, Bytes.toBytes(scan)); 187 Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); 188 189 } 190 191 @Override 192 public void readFields(DataInput in) throws IOException { 193 int len = in.readInt(); 194 byte[] buf = new byte[len]; 195 in.readFully(buf); 196 TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf); 197 this.htd = ProtobufUtil.toTableDescriptor(split.getTable()); 198 this.regionInfo = HRegionInfo.convert(split.getRegion()); 199 List<String> locationsList = split.getLocationsList(); 200 this.locations = locationsList.toArray(new String[locationsList.size()]); 201 202 this.scan = Bytes.toString(Bytes.readByteArray(in)); 203 this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); 204 } 205 } 206 207 /** 208 * Implementation class for RecordReader logic common between mapred and mapreduce. 209 */ 210 public static class RecordReader { 211 private InputSplit split; 212 private Scan scan; 213 private Result result = null; 214 private ImmutableBytesWritable row = null; 215 private ClientSideRegionScanner scanner; 216 217 public ClientSideRegionScanner getScanner() { 218 return scanner; 219 } 220 221 public void initialize(InputSplit split, Configuration conf) throws IOException { 222 this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); 223 this.split = split; 224 TableDescriptor htd = split.htd; 225 HRegionInfo hri = this.split.getRegionInfo(); 226 FileSystem fs = FSUtils.getCurrentFileSystem(conf); 227 228 229 // region is immutable, this should be fine, 230 // otherwise we have to set the thread read point 231 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 232 // disable caching of data blocks 233 scan.setCacheBlocks(false); 234 scan.setScanMetricsEnabled(true); 235 236 scanner = 237 new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); 238 } 239 240 public boolean nextKeyValue() throws IOException { 241 result = scanner.next(); 242 if (result == null) { 243 //we are done 244 return false; 245 } 246 247 if (this.row == null) { 248 this.row = new ImmutableBytesWritable(); 249 } 250 this.row.set(result.getRow()); 251 return true; 252 } 253 254 public ImmutableBytesWritable getCurrentKey() { 255 return row; 256 } 257 258 public Result getCurrentValue() { 259 return result; 260 } 261 262 public long getPos() { 263 return 0; 264 } 265 266 public float getProgress() { 267 return 0; // TODO: use total bytes to estimate 268 } 269 270 public void close() { 271 if (this.scanner != null) { 272 this.scanner.close(); 273 } 274 } 275 } 276 277 public static List<InputSplit> getSplits(Configuration conf) throws IOException { 278 String snapshotName = getSnapshotName(conf); 279 280 Path rootDir = FSUtils.getRootDir(conf); 281 FileSystem fs = rootDir.getFileSystem(conf); 282 283 SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); 284 285 List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest); 286 287 // TODO: mapred does not support scan as input API. Work around for now. 288 Scan scan = extractScanFromConf(conf); 289 // the temp dir where the snapshot is restored 290 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); 291 292 RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf); 293 294 int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1); 295 296 return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits); 297 } 298 299 public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{ 300 String splitAlgoClassName = conf.get(SPLIT_ALGO); 301 if (splitAlgoClassName == null) 302 return null; 303 try { 304 return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class) 305 .getDeclaredConstructor().newInstance(); 306 } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | 307 NoSuchMethodException | InvocationTargetException e) { 308 throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e); 309 } 310 } 311 312 313 public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) { 314 List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests(); 315 if (regionManifests == null) { 316 throw new IllegalArgumentException("Snapshot seems empty"); 317 } 318 319 List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); 320 321 for (SnapshotRegionManifest regionManifest : regionManifests) { 322 HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); 323 if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { 324 continue; 325 } 326 regionInfos.add(hri); 327 } 328 return regionInfos; 329 } 330 331 public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, 332 Path rootDir, FileSystem fs) throws IOException { 333 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); 334 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 335 return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); 336 } 337 338 public static Scan extractScanFromConf(Configuration conf) throws IOException { 339 Scan scan = null; 340 if (conf.get(TableInputFormat.SCAN) != null) { 341 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); 342 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { 343 String[] columns = 344 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); 345 scan = new Scan(); 346 for (String col : columns) { 347 scan.addFamily(Bytes.toBytes(col)); 348 } 349 } else { 350 throw new IllegalArgumentException("Unable to create scan"); 351 } 352 return scan; 353 } 354 355 public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, 356 List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException { 357 return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1); 358 } 359 360 public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, 361 List<HRegionInfo> regionManifests, Path restoreDir, 362 Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException { 363 // load table descriptor 364 TableDescriptor htd = manifest.getTableDescriptor(); 365 366 Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); 367 368 boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, 369 SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); 370 371 List<InputSplit> splits = new ArrayList<>(); 372 for (HRegionInfo hri : regionManifests) { 373 // load region descriptor 374 375 if (numSplits > 1) { 376 byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); 377 for (int i = 0; i < sp.length - 1; i++) { 378 if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], 379 sp[i + 1])) { 380 List<String> hosts = 381 calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); 382 383 Scan boundedScan = new Scan(scan); 384 if (scan.getStartRow().length == 0) { 385 boundedScan.withStartRow(sp[i]); 386 } else { 387 boundedScan.withStartRow( 388 Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]); 389 } 390 391 if (scan.getStopRow().length == 0) { 392 boundedScan.withStopRow(sp[i + 1]); 393 } else { 394 boundedScan.withStopRow( 395 Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); 396 } 397 398 splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); 399 } 400 } 401 } else { 402 if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), 403 hri.getStartKey(), hri.getEndKey())) { 404 List<String> hosts = 405 calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled); 406 splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); 407 } 408 } 409 } 410 411 return splits; 412 } 413 414 /** 415 * Compute block locations for snapshot files (which will get the locations for referred hfiles) 416 * only when localityEnabled is true. 417 */ 418 private static List<String> calculateLocationsForInputSplit(Configuration conf, 419 TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled) 420 throws IOException { 421 if (localityEnabled) { // care block locality 422 return getBestLocations(conf, 423 HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); 424 } else { // do not care block locality 425 return null; 426 } 427 } 428 429 /** 430 * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take 431 * weights into account, thus will treat every location passed from the input split as equal. We 432 * do not want to blindly pass all the locations, since we are creating one split per region, and 433 * the region's blocks are all distributed throughout the cluster unless favorite node assignment 434 * is used. On the expected stable case, only one location will contain most of the blocks as 435 * local. 436 * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here 437 * we are doing a simple heuristic, where we will pass all hosts which have at least 80% 438 * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top 439 * host with the best locality. 440 * Return at most numTopsAtMost locations if there are more than that. 441 */ 442 private static List<String> getBestLocations(Configuration conf, 443 HDFSBlocksDistribution blockDistribution, int numTopsAtMost) { 444 HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); 445 446 if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is 447 return null; 448 } 449 450 if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1 451 numTopsAtMost = 1; 452 } 453 int top = Math.min(numTopsAtMost, hostAndWeights.length); 454 List<String> locations = new ArrayList<>(top); 455 HostAndWeight topHost = hostAndWeights[0]; 456 locations.add(topHost.getHost()); 457 458 if (top == 1) { // only care about the top host 459 return locations; 460 } 461 462 // When top >= 2, 463 // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality 464 double cutoffMultiplier 465 = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); 466 467 double filterWeight = topHost.getWeight() * cutoffMultiplier; 468 469 for (int i = 1; i <= top - 1; i++) { 470 if (hostAndWeights[i].getWeight() >= filterWeight) { 471 locations.add(hostAndWeights[i].getHost()); 472 } else { 473 // As hostAndWeights is in descending order, 474 // we could break the loop as long as we meet a weight which is less than filterWeight. 475 break; 476 } 477 } 478 479 return locations; 480 } 481 482 public static List<String> getBestLocations(Configuration conf, 483 HDFSBlocksDistribution blockDistribution) { 484 // 3 nodes will contain highly local blocks. So default to 3. 485 return getBestLocations(conf, blockDistribution, 3); 486 } 487 488 private static String getSnapshotName(Configuration conf) { 489 String snapshotName = conf.get(SNAPSHOT_NAME_KEY); 490 if (snapshotName == null) { 491 throw new IllegalArgumentException("Snapshot name must be provided"); 492 } 493 return snapshotName; 494 } 495 496 /** 497 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 498 * @param conf the job to configuration 499 * @param snapshotName the name of the snapshot to read from 500 * @param restoreDir a temporary directory to restore the snapshot into. Current user should have 501 * write permissions to this directory, and this should not be a subdirectory of rootdir. 502 * After the job is finished, restoreDir can be deleted. 503 * @throws IOException if an error occurs 504 */ 505 public static void setInput(Configuration conf, String snapshotName, Path restoreDir) 506 throws IOException { 507 setInput(conf, snapshotName, restoreDir, null, 1); 508 } 509 510 /** 511 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 512 * @param conf the job to configure 513 * @param snapshotName the name of the snapshot to read from 514 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 515 * have write permissions to this directory, and this should not be a subdirectory of rootdir. 516 * After the job is finished, restoreDir can be deleted. 517 * @param numSplitsPerRegion how many input splits to generate per one region 518 * @param splitAlgo SplitAlgorithm to be used when generating InputSplits 519 * @throws IOException if an error occurs 520 */ 521 public static void setInput(Configuration conf, String snapshotName, Path restoreDir, 522 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) 523 throws IOException { 524 conf.set(SNAPSHOT_NAME_KEY, snapshotName); 525 if (numSplitsPerRegion < 1) { 526 throw new IllegalArgumentException("numSplits must be >= 1, " + 527 "illegal numSplits : " + numSplitsPerRegion); 528 } 529 if (splitAlgo == null && numSplitsPerRegion > 1) { 530 throw new IllegalArgumentException("Split algo can't be null when numSplits > 1"); 531 } 532 if (splitAlgo != null) { 533 conf.set(SPLIT_ALGO, splitAlgo.getClass().getName()); 534 } 535 conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion); 536 Path rootDir = FSUtils.getRootDir(conf); 537 FileSystem fs = rootDir.getFileSystem(conf); 538 539 restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); 540 541 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 542 conf.set(RESTORE_DIR_KEY, restoreDir.toString()); 543 } 544}