001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION; 021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT; 022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 023import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; 024import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; 025import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE; 026import static org.junit.jupiter.api.Assertions.assertEquals; 027import static org.junit.jupiter.api.Assertions.assertFalse; 028import static org.junit.jupiter.api.Assertions.assertTrue; 029import static org.mockito.Mockito.mock; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.util.Arrays; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Set; 037import java.util.stream.Collectors; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HDFSBlocksDistribution; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.Admin; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Scan.ReadType; 050import org.apache.hadoop.hbase.client.Table; 051import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; 052import org.apache.hadoop.hbase.io.HFileLink; 053import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 054import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionRecordReader; 055import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; 056import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 057import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.RegionSplitter; 063import org.apache.hadoop.io.NullWritable; 064import org.apache.hadoop.mapreduce.InputSplit; 065import org.apache.hadoop.mapreduce.Job; 066import org.apache.hadoop.mapreduce.RecordReader; 067import org.apache.hadoop.mapreduce.Reducer; 068import org.apache.hadoop.mapreduce.TaskAttemptContext; 069import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 070import org.junit.jupiter.api.BeforeEach; 071import org.junit.jupiter.api.Tag; 072import org.junit.jupiter.api.Test; 073import org.junit.jupiter.api.TestInfo; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 078 079@Tag(VerySlowMapReduceTests.TAG) 080@Tag(LargeTests.TAG) 081public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class); 084 085 private static final byte[] bbb = Bytes.toBytes("bbb"); 086 private static final byte[] yyy = Bytes.toBytes("yyy"); 087 private static final byte[] bbc = Bytes.toBytes("bbc"); 088 private static final byte[] yya = Bytes.toBytes("yya"); 089 090 private String name; 091 092 @BeforeEach 093 public void setUp(TestInfo testInfo) throws Exception { 094 name = testInfo.getTestMethod().get().getName(); 095 } 096 097 @Override 098 protected byte[] getStartRow() { 099 return bbb; 100 } 101 102 @Override 103 protected byte[] getEndRow() { 104 return yyy; 105 } 106 107 @Test 108 public void testGetBestLocations() throws IOException { 109 TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl(); 110 Configuration conf = UTIL.getConfiguration(); 111 112 HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); 113 assertEquals(null, TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 114 115 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1); 116 assertEquals(Lists.newArrayList("h1"), 117 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 118 119 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1); 120 assertEquals(Lists.newArrayList("h1"), 121 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 122 123 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 1); 124 assertEquals(Lists.newArrayList("h1"), 125 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 126 127 blockDistribution = new HDFSBlocksDistribution(); 128 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 10); 129 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 7); 130 blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 5); 131 blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 1); 132 assertEquals(Lists.newArrayList("h1"), 133 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 134 135 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 2); 136 assertEquals(Lists.newArrayList("h1", "h2"), 137 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 138 139 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 3); 140 assertEquals(Lists.newArrayList("h2", "h1"), 141 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 142 143 blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 6); 144 blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 9); 145 146 assertEquals(Lists.newArrayList("h2", "h3", "h4"), 147 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 148 } 149 150 @Test 151 public void testTableSnapshotRegionRecordReaderGetFilesRead() throws Exception { 152 final TableName tableName = TableName.valueOf(name); 153 String snapshotName = name + "_snapshot"; 154 try { 155 // Setup: create table, load data, snapshot, and configure job with restore dir 156 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 157 158 Configuration conf = UTIL.getConfiguration(); 159 Job job = new Job(conf); 160 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 161 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); 162 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 163 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 164 tmpTableDir); 165 166 // Get splits (one per region) and extract delegate split for restore path and region info 167 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 168 List<InputSplit> splits = tsif.getSplits(job); 169 assertEquals(1, splits.size()); 170 171 InputSplit split = splits.get(0); 172 assertTrue(split instanceof TableSnapshotRegionSplit); 173 TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; 174 TableSnapshotInputFormatImpl.InputSplit implSplit = snapshotRegionSplit.getDelegate(); 175 176 // Collect expected store file paths from the restored region directory 177 Set<String> expectedFiles = new HashSet<>(); 178 Path restorePath = new Path(implSplit.getRestoreDir()); 179 FileSystem fs = restorePath.getFileSystem(conf); 180 Path tableDir = 181 CommonFSUtils.getTableDir(restorePath, implSplit.getTableDescriptor().getTableName()); 182 Path regionPath = new Path(tableDir, implSplit.getRegionInfo().getEncodedName()); 183 FileStatus[] familyDirs = fs.listStatus(regionPath); 184 if (familyDirs != null) { 185 for (FileStatus fam : familyDirs) { 186 if (fam.isDirectory()) { 187 FileStatus[] files = fs.listStatus(fam.getPath()); 188 if (files != null) { 189 for (FileStatus f : files) { 190 if (f.isFile()) { 191 String referenceFileName = f.getPath().getName(); 192 expectedFiles.add(HFileLink.getReferencedHFileName(referenceFileName)); 193 } 194 } 195 } 196 } 197 } 198 } 199 assertFalse(expectedFiles.isEmpty(), 200 "Should have at least one store file after snapshot restore"); 201 202 // Create record reader, initialize with split (opens underlying ClientSideRegionScanner) 203 TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); 204 when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); 205 206 RecordReader<ImmutableBytesWritable, Result> rr = 207 tsif.createRecordReader(split, taskAttemptContext); 208 assertTrue(rr instanceof TableSnapshotRegionRecordReader); 209 TableSnapshotRegionRecordReader recordReader = (TableSnapshotRegionRecordReader) rr; 210 recordReader.initialize(split, taskAttemptContext); 211 212 // Before close: getFilesRead() must be empty 213 Set<Path> filesReadBeforeClose = recordReader.getFilesRead(); 214 assertTrue(filesReadBeforeClose.isEmpty(), "Should return empty set before closing"); 215 216 // Read a few key-values; getFilesRead() must still be empty until close 217 int count = 0; 218 while (count < 3 && recordReader.nextKeyValue()) { 219 count++; 220 } 221 222 filesReadBeforeClose = recordReader.getFilesRead(); 223 assertTrue(filesReadBeforeClose.isEmpty(), 224 "Should return empty set before closing even after reading"); 225 226 // Close reader so underlying scanner reports files successfully read 227 recordReader.close(); 228 229 // After close: getFilesRead() must match expected store file set 230 Set<String> filesReadAfterClose = 231 recordReader.getFilesRead().stream().map(Path::getName).collect(Collectors.toSet()); 232 233 assertEquals(expectedFiles, filesReadAfterClose, "Should contain all expected file paths"); 234 } finally { 235 UTIL.getAdmin().deleteSnapshot(snapshotName); 236 UTIL.deleteTable(tableName); 237 } 238 } 239 240 public static enum TestTableSnapshotCounters { 241 VALIDATION_ERROR 242 } 243 244 public static class TestTableSnapshotMapper 245 extends TableMapper<ImmutableBytesWritable, NullWritable> { 246 @Override 247 protected void map(ImmutableBytesWritable key, Result value, Context context) 248 throws IOException, InterruptedException { 249 // Validate a single row coming from the snapshot, and emit the row key 250 verifyRowFromMap(key, value); 251 context.write(key, NullWritable.get()); 252 } 253 } 254 255 public static class TestTableSnapshotReducer 256 extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 257 HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(bbb, yyy); 258 259 @Override 260 protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values, 261 Context context) throws IOException, InterruptedException { 262 rowTracker.addRow(key.get()); 263 } 264 265 @Override 266 protected void cleanup(Context context) throws IOException, InterruptedException { 267 rowTracker.validate(); 268 } 269 } 270 271 @Test 272 public void testInitTableSnapshotMapperJobConfig() throws Exception { 273 final TableName tableName = TableName.valueOf(name); 274 String snapshotName = "foo"; 275 276 try { 277 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 278 Job job = new Job(UTIL.getConfiguration()); 279 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 280 281 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 282 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 283 tmpTableDir); 284 285 // TODO: would be better to examine directly the cache instance that results from this 286 // config. Currently this is not possible because BlockCache initialization is static. 287 assertEquals(HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 288 job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01, 289 "Snapshot job should be configured for default LruBlockCache."); 290 assertEquals(0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01, 291 "Snapshot job should not use BucketCache."); 292 } finally { 293 UTIL.getAdmin().deleteSnapshot(snapshotName); 294 UTIL.deleteTable(tableName); 295 } 296 } 297 298 @Test 299 public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception { 300 Configuration conf = UTIL.getConfiguration(); 301 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true); 302 try { 303 testWithMockedMapReduce(UTIL, name + "Snapshot", 1, 1, 1, true); 304 } finally { 305 conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION); 306 } 307 } 308 309 @Override 310 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 311 String snapshotName, Path tmpTableDir) throws Exception { 312 Job job = new Job(UTIL.getConfiguration()); 313 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 314 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 315 tmpTableDir); 316 } 317 318 @Override 319 public void testWithMockedMapReduce(HBaseTestingUtil util, String snapshotName, int numRegions, 320 int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception { 321 final TableName tableName = TableName.valueOf(name); 322 try { 323 createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 324 325 Configuration conf = util.getConfiguration(); 326 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo); 327 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, 328 SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); 329 Job job = new Job(conf); 330 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 331 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); // limit the scan 332 333 if (numSplitsPerRegion > 1) { 334 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 335 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 336 false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 337 } else { 338 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 339 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 340 false, tmpTableDir); 341 } 342 343 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 344 345 } finally { 346 util.getAdmin().deleteSnapshot(snapshotName); 347 util.deleteTable(tableName); 348 } 349 } 350 351 @Test 352 public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception { 353 String snapshotName = "testWithMockedMapReduceMultiRegion"; 354 final TableName tableName = TableName.valueOf(name); 355 try { 356 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 357 358 Configuration conf = UTIL.getConfiguration(); 359 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 360 Job job = new Job(conf); 361 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 362 // test scan with startRow and stopRow 363 Scan scan = new Scan().withStartRow(bbc).withStopRow(yya); 364 365 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 366 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 367 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 368 369 verifyWithMockedMapReduce(job, 10, 40, bbc, yya); 370 } finally { 371 UTIL.getAdmin().deleteSnapshot(snapshotName); 372 UTIL.deleteTable(tableName); 373 } 374 } 375 376 @Test 377 public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception { 378 String snapshotName = "testWithMockedMapReduceMultiRegion"; 379 final TableName tableName = TableName.valueOf(name); 380 try { 381 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 382 383 Configuration conf = UTIL.getConfiguration(); 384 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 385 Job job = new Job(conf); 386 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 387 // test scan without startRow and stopRow 388 Scan scan2 = new Scan(); 389 390 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2, 391 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 392 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 393 394 verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW, 395 HConstants.EMPTY_START_ROW); 396 397 } finally { 398 UTIL.getAdmin().deleteSnapshot(snapshotName); 399 UTIL.deleteTable(tableName); 400 } 401 } 402 403 @Test 404 public void testScanLimit() throws Exception { 405 final TableName tableName = TableName.valueOf(name); 406 final String snapshotName = tableName + "Snapshot"; 407 Table table = null; 408 try { 409 UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10); 410 if (UTIL.getAdmin().tableExists(tableName)) { 411 UTIL.deleteTable(tableName); 412 } 413 414 UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy }); 415 416 Admin admin = UTIL.getAdmin(); 417 418 int regionNum = admin.getRegions(tableName).size(); 419 // put some stuff in the table 420 table = UTIL.getConnection().getTable(tableName); 421 UTIL.loadTable(table, FAMILIES); 422 423 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 424 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 425 426 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 427 null, snapshotName, rootDir, fs, true); 428 429 Job job = new Job(UTIL.getConfiguration()); 430 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 431 Scan scan = new Scan(); 432 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 433 TestTableSnapshotInputFormat.class); 434 435 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 436 RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true, 437 tmpTableDir); 438 assertTrue(job.waitForCompletion(true)); 439 assertEquals(10 * regionNum, 440 job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue()); 441 } finally { 442 if (table != null) { 443 table.close(); 444 } 445 UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT); 446 UTIL.getAdmin().deleteSnapshot(snapshotName); 447 UTIL.deleteTable(tableName); 448 } 449 } 450 451 @Test 452 public void testNoDuplicateResultsWhenSplitting() throws Exception { 453 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 454 String snapshotName = "testSnapshotBug"; 455 try { 456 if (UTIL.getAdmin().tableExists(tableName)) { 457 UTIL.deleteTable(tableName); 458 } 459 460 UTIL.createTable(tableName, FAMILIES); 461 Admin admin = UTIL.getAdmin(); 462 463 // put some stuff in the table 464 Table table = UTIL.getConnection().getTable(tableName); 465 UTIL.loadTable(table, FAMILIES); 466 467 // split to 2 regions 468 admin.split(tableName, Bytes.toBytes("eee")); 469 TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2); 470 471 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 472 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 473 474 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 475 null, snapshotName, rootDir, fs, true); 476 477 // load different values 478 byte[] value = Bytes.toBytes("after_snapshot_value"); 479 UTIL.loadTable(table, FAMILIES, value); 480 481 // cause flush to create new files in the region 482 admin.flush(tableName); 483 table.close(); 484 485 Job job = new Job(UTIL.getConfiguration()); 486 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 487 // limit the scan 488 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); 489 490 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 491 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 492 tmpTableDir); 493 494 verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow()); 495 } finally { 496 UTIL.getAdmin().deleteSnapshot(snapshotName); 497 UTIL.deleteTable(tableName); 498 } 499 } 500 501 @Test 502 public void testScannerReadTypeConfiguration() throws IOException { 503 Configuration conf = new Configuration(false); 504 // Explicitly set ReadTypes should persist 505 for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) { 506 Scan scanWithReadType = new Scan(); 507 scanWithReadType.setReadType(readType); 508 assertEquals(scanWithReadType.getReadType(), 509 serializeAndReturn(conf, scanWithReadType).getReadType()); 510 } 511 // We should only see the DEFAULT ReadType getting updated to STREAM. 512 Scan scanWithoutReadType = new Scan(); 513 assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); 514 assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType()); 515 516 // We should still be able to force a certain ReadType when DEFAULT is given. 517 conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD); 518 assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); 519 assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType()); 520 } 521 522 /** 523 * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat 524 * does. 525 */ 526 private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException { 527 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s)); 528 return TableSnapshotInputFormatImpl.extractScanFromConf(conf); 529 } 530 531 private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, 532 byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { 533 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 534 List<InputSplit> splits = tsif.getSplits(job); 535 536 assertEquals(expectedNumSplits, splits.size()); 537 538 HBaseTestingUtil.SeenRowTracker rowTracker = new HBaseTestingUtil.SeenRowTracker(startRow, 539 stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff")); 540 541 boolean localityEnabled = job.getConfiguration().getBoolean( 542 SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); 543 544 boolean byRegionLoc = 545 job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, 546 SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); 547 for (int i = 0; i < splits.size(); i++) { 548 // validate input split 549 InputSplit split = splits.get(i); 550 assertTrue(split instanceof TableSnapshotRegionSplit); 551 TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; 552 if (localityEnabled) { 553 assertTrue(split.getLocations() != null && split.getLocations().length != 0); 554 if (byRegionLoc) { 555 // When it uses region location from meta, the hostname will be "localhost", 556 // the location from hdfs block location is "127.0.0.1". 557 assertEquals(1, split.getLocations().length); 558 assertTrue(split.getLocations()[0].equals("localhost"), "Not using region location!"); 559 } else { 560 assertTrue(split.getLocations()[0].equals("127.0.0.1"), "Not using region location!"); 561 } 562 } else { 563 assertTrue(split.getLocations() != null && split.getLocations().length == 0); 564 } 565 566 Scan scan = 567 TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); 568 if (startRow.length > 0) { 569 assertTrue(Bytes.compareTo(startRow, scan.getStartRow()) <= 0, 570 Bytes.toStringBinary(startRow) + " should <= " 571 + Bytes.toStringBinary(scan.getStartRow())); 572 } 573 if (stopRow.length > 0) { 574 assertTrue(Bytes.compareTo(stopRow, scan.getStopRow()) >= 0, 575 Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow())); 576 } 577 assertTrue(Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0, 578 "startRow should < stopRow"); 579 580 // validate record reader 581 TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); 582 when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); 583 RecordReader<ImmutableBytesWritable, Result> rr = 584 tsif.createRecordReader(split, taskAttemptContext); 585 rr.initialize(split, taskAttemptContext); 586 587 // validate we can read all the data back 588 while (rr.nextKeyValue()) { 589 byte[] row = rr.getCurrentKey().get(); 590 verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); 591 rowTracker.addRow(row); 592 } 593 594 rr.close(); 595 } 596 597 // validate all rows are seen 598 rowTracker.validate(); 599 } 600 601 @Override 602 protected void testWithMapReduceImpl(HBaseTestingUtil util, TableName tableName, 603 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, 604 int expectedNumSplits, boolean shutdownCluster) throws Exception { 605 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 606 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 607 } 608 609 // this is also called by the IntegrationTestTableSnapshotInputFormat 610 public static void doTestWithMapReduce(HBaseTestingUtil util, TableName tableName, 611 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 612 int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { 613 614 LOG.info("testing with MapReduce"); 615 616 LOG.info("create the table and snapshot"); 617 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 618 619 if (shutdownCluster) { 620 LOG.info("shutting down hbase cluster."); 621 util.shutdownMiniHBaseCluster(); 622 } 623 624 try { 625 // create the job 626 Job job = new Job(util.getConfiguration()); 627 Scan scan = new Scan().withStartRow(startRow).withStopRow(endRow); // limit the scan 628 629 job.setJarByClass(util.getClass()); 630 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 631 TestTableSnapshotInputFormat.class); 632 633 if (numSplitsPerRegion > 1) { 634 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 635 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 636 true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 637 } else { 638 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 639 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 640 true, tableDir); 641 } 642 643 job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 644 job.setNumReduceTasks(1); 645 job.setOutputFormatClass(NullOutputFormat.class); 646 647 assertTrue(job.waitForCompletion(true)); 648 } finally { 649 if (!shutdownCluster) { 650 util.getAdmin().deleteSnapshot(snapshotName); 651 util.deleteTable(tableName); 652 } 653 } 654 } 655 656 @Test 657 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 658 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 659 } 660 661 @Test 662 public void testCleanRestoreDir() throws Exception { 663 TableName tableName = TableName.valueOf("test_table"); 664 String snapshotName = "test_snapshot"; 665 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 666 Job job = Job.getInstance(UTIL.getConfiguration()); 667 Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName); 668 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 669 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 670 workingDir); 671 FileSystem fs = workingDir.getFileSystem(job.getConfiguration()); 672 Path restorePath = 673 new Path(job.getConfiguration().get("hbase.TableSnapshotInputFormat.restore.dir")); 674 assertTrue(fs.exists(restorePath)); 675 TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName); 676 assertFalse(fs.exists(restorePath)); 677 } 678 679 /** 680 * Test that explicitly restores a snapshot to a temp directory and reads the restored regions via 681 * ClientSideRegionScanner through a MapReduce job. 682 * <p> 683 * This test verifies the full workflow: 1. Create and load a table with data 2. Create a snapshot 684 * and restore the snapshot to a temporary directory 3. Configure a job to read the restored 685 * regions via ClientSideRegionScanner using TableSnapshotInputFormat and verify that it succeeds 686 * 4. Delete restored temporary directory 5. Configure a new job and verify that it fails 687 */ 688 @Test 689 public void testReadFromRestoredSnapshotViaMR() throws Exception { 690 final TableName tableName = TableName.valueOf(name); 691 final String snapshotName = tableName + "_snapshot"; 692 try { 693 if (UTIL.getAdmin().tableExists(tableName)) { 694 UTIL.deleteTable(tableName); 695 } 696 UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy }); 697 698 Admin admin = UTIL.getAdmin(); 699 int regionNum = admin.getRegions(tableName).size(); 700 LOG.info("Created table with {} regions", regionNum); 701 702 Table table = UTIL.getConnection().getTable(tableName); 703 UTIL.loadTable(table, FAMILIES); 704 table.close(); 705 706 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 707 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 708 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 709 null, snapshotName, rootDir, fs, true); 710 Path tempRestoreDir = UTIL.getDataTestDirOnTestFS("restore_" + snapshotName); 711 RestoreSnapshotHelper.copySnapshotForScanner(UTIL.getConfiguration(), fs, rootDir, 712 tempRestoreDir, snapshotName); 713 assertTrue(fs.exists(tempRestoreDir), "Restore directory should exist"); 714 715 Job job = Job.getInstance(UTIL.getConfiguration()); 716 job.setJarByClass(TestTableSnapshotInputFormat.class); 717 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 718 TestTableSnapshotInputFormat.class); 719 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); 720 Configuration conf = job.getConfiguration(); 721 conf.set("hbase.TableSnapshotInputFormat.snapshot.name", snapshotName); 722 conf.set("hbase.TableSnapshotInputFormat.restore.dir", tempRestoreDir.toString()); 723 conf.setInt("hbase.mapreduce.splits.per.region", 1); 724 job.setReducerClass(TestTableSnapshotReducer.class); 725 job.setNumReduceTasks(1); 726 job.setOutputFormatClass(NullOutputFormat.class); 727 TableMapReduceUtil.initTableMapperJob(snapshotName, // table name (snapshot name in this case) 728 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 729 false, false, TableSnapshotInputFormat.class); 730 TableMapReduceUtil.resetCacheConfig(conf); 731 assertTrue(job.waitForCompletion(true)); 732 assertTrue(job.isSuccessful()); 733 734 // Now verify that job fails when restore directory is deleted 735 assertTrue(fs.delete(tempRestoreDir, true)); 736 assertFalse(fs.exists(tempRestoreDir), "Restore directory should not exist after deletion"); 737 Job failureJob = Job.getInstance(UTIL.getConfiguration()); 738 failureJob.setJarByClass(TestTableSnapshotInputFormat.class); 739 TableMapReduceUtil.addDependencyJarsForClasses(failureJob.getConfiguration(), 740 TestTableSnapshotInputFormat.class); 741 Configuration failureConf = failureJob.getConfiguration(); 742 // Configure job to use the deleted restore directory 743 failureConf.set("hbase.TableSnapshotInputFormat.snapshot.name", snapshotName); 744 failureConf.set("hbase.TableSnapshotInputFormat.restore.dir", tempRestoreDir.toString()); 745 failureConf.setInt("hbase.mapreduce.splits.per.region", 1); 746 failureJob.setReducerClass(TestTableSnapshotReducer.class); 747 failureJob.setNumReduceTasks(1); 748 failureJob.setOutputFormatClass(NullOutputFormat.class); 749 750 TableMapReduceUtil.initTableMapperJob(snapshotName, scan, TestTableSnapshotMapper.class, 751 ImmutableBytesWritable.class, NullWritable.class, failureJob, false, false, 752 TableSnapshotInputFormat.class); 753 TableMapReduceUtil.resetCacheConfig(failureConf); 754 755 assertFalse(fs.exists(tempRestoreDir), 756 "Restore directory should not exist before job execution"); 757 failureJob.waitForCompletion(true); 758 759 assertFalse(failureJob.isSuccessful(), 760 "Job should fail since the restored snapshot directory is deleted"); 761 762 } finally { 763 try { 764 if (UTIL.getAdmin().tableExists(tableName)) { 765 UTIL.deleteTable(tableName); 766 } 767 } catch (Exception e) { 768 LOG.warn("Error deleting table", e); 769 } 770 try { 771 UTIL.getAdmin().deleteSnapshot(snapshotName); 772 } catch (Exception e) { 773 LOG.warn("Error deleting snapshot", e); 774 } 775 } 776 } 777}