001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION; 021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT; 022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 023import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; 024import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT; 025import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE; 026import static org.junit.Assert.assertEquals; 027import static org.mockito.Mockito.mock; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.Arrays; 032import java.util.List; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HDFSBlocksDistribution; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Result; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Scan.ReadType; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; 049import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.RegionSplitter; 055import org.apache.hadoop.io.NullWritable; 056import org.apache.hadoop.mapreduce.InputSplit; 057import org.apache.hadoop.mapreduce.Job; 058import org.apache.hadoop.mapreduce.RecordReader; 059import org.apache.hadoop.mapreduce.Reducer; 060import org.apache.hadoop.mapreduce.TaskAttemptContext; 061import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 062import org.junit.Assert; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 072 073@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 074public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class); 081 082 private static final byte[] bbb = Bytes.toBytes("bbb"); 083 private static final byte[] yyy = Bytes.toBytes("yyy"); 084 private static final byte[] bbc = Bytes.toBytes("bbc"); 085 private static final byte[] yya = Bytes.toBytes("yya"); 086 087 @Rule 088 public TestName name = new TestName(); 089 090 @Override 091 protected byte[] getStartRow() { 092 return bbb; 093 } 094 095 @Override 096 protected byte[] getEndRow() { 097 return yyy; 098 } 099 100 @Test 101 public void testGetBestLocations() throws IOException { 102 TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl(); 103 Configuration conf = UTIL.getConfiguration(); 104 105 HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); 106 Assert.assertEquals(null, 107 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 108 109 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1); 110 Assert.assertEquals(Lists.newArrayList("h1"), 111 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 112 113 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 1); 114 Assert.assertEquals(Lists.newArrayList("h1"), 115 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 116 117 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 1); 118 Assert.assertEquals(Lists.newArrayList("h1"), 119 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 120 121 blockDistribution = new HDFSBlocksDistribution(); 122 blockDistribution.addHostsAndBlockWeight(new String[] { "h1" }, 10); 123 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 7); 124 blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 5); 125 blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 1); 126 Assert.assertEquals(Lists.newArrayList("h1"), 127 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 128 129 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 2); 130 Assert.assertEquals(Lists.newArrayList("h1", "h2"), 131 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 132 133 blockDistribution.addHostsAndBlockWeight(new String[] { "h2" }, 3); 134 Assert.assertEquals(Lists.newArrayList("h2", "h1"), 135 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 136 137 blockDistribution.addHostsAndBlockWeight(new String[] { "h3" }, 6); 138 blockDistribution.addHostsAndBlockWeight(new String[] { "h4" }, 9); 139 140 Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"), 141 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 142 } 143 144 public static enum TestTableSnapshotCounters { 145 VALIDATION_ERROR 146 } 147 148 public static class TestTableSnapshotMapper 149 extends TableMapper<ImmutableBytesWritable, NullWritable> { 150 @Override 151 protected void map(ImmutableBytesWritable key, Result value, Context context) 152 throws IOException, InterruptedException { 153 // Validate a single row coming from the snapshot, and emit the row key 154 verifyRowFromMap(key, value); 155 context.write(key, NullWritable.get()); 156 } 157 } 158 159 public static class TestTableSnapshotReducer 160 extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 161 HBaseTestingUtility.SeenRowTracker rowTracker = 162 new HBaseTestingUtility.SeenRowTracker(bbb, yyy); 163 164 @Override 165 protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values, 166 Context context) throws IOException, InterruptedException { 167 rowTracker.addRow(key.get()); 168 } 169 170 @Override 171 protected void cleanup(Context context) throws IOException, InterruptedException { 172 rowTracker.validate(); 173 } 174 } 175 176 @Test 177 public void testInitTableSnapshotMapperJobConfig() throws Exception { 178 final TableName tableName = TableName.valueOf(name.getMethodName()); 179 String snapshotName = "foo"; 180 181 try { 182 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 183 Job job = new Job(UTIL.getConfiguration()); 184 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 185 186 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 187 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 188 tmpTableDir); 189 190 // TODO: would be better to examine directly the cache instance that results from this 191 // config. Currently this is not possible because BlockCache initialization is static. 192 Assert.assertEquals("Snapshot job should be configured for default LruBlockCache.", 193 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 194 job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); 195 Assert.assertEquals("Snapshot job should not use BucketCache.", 0, 196 job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01); 197 } finally { 198 UTIL.getAdmin().deleteSnapshot(snapshotName); 199 UTIL.deleteTable(tableName); 200 } 201 } 202 203 @Test 204 public void testWithMockedMapReduceSingleRegionByRegionLocation() throws Exception { 205 Configuration conf = UTIL.getConfiguration(); 206 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true); 207 try { 208 testWithMockedMapReduce(UTIL, name.getMethodName() + "Snapshot", 1, 1, 1, true); 209 } finally { 210 conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION); 211 } 212 } 213 214 @Override 215 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 216 String snapshotName, Path tmpTableDir) throws Exception { 217 Job job = new Job(UTIL.getConfiguration()); 218 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 219 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 220 tmpTableDir); 221 } 222 223 @Override 224 public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, 225 int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) throws Exception { 226 final TableName tableName = TableName.valueOf(name.getMethodName()); 227 try { 228 createTableAndSnapshot(util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 229 230 Configuration conf = util.getConfiguration(); 231 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo); 232 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, 233 SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); 234 Job job = new Job(conf); 235 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 236 Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan 237 238 if (numSplitsPerRegion > 1) { 239 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 240 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 241 false, tmpTableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 242 } else { 243 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 244 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 245 false, tmpTableDir); 246 } 247 248 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 249 250 } finally { 251 util.getAdmin().deleteSnapshot(snapshotName); 252 util.deleteTable(tableName); 253 } 254 } 255 256 @Test 257 public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception { 258 String snapshotName = "testWithMockedMapReduceMultiRegion"; 259 final TableName tableName = TableName.valueOf(name.getMethodName()); 260 try { 261 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 262 263 Configuration conf = UTIL.getConfiguration(); 264 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 265 Job job = new Job(conf); 266 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 267 // test scan with startRow and stopRow 268 Scan scan = new Scan(bbc, yya); 269 270 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 271 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 272 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 273 274 verifyWithMockedMapReduce(job, 10, 40, bbc, yya); 275 } finally { 276 UTIL.getAdmin().deleteSnapshot(snapshotName); 277 UTIL.deleteTable(tableName); 278 } 279 } 280 281 @Test 282 public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception { 283 String snapshotName = "testWithMockedMapReduceMultiRegion"; 284 final TableName tableName = TableName.valueOf(name.getMethodName()); 285 try { 286 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 287 288 Configuration conf = UTIL.getConfiguration(); 289 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 290 Job job = new Job(conf); 291 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 292 // test scan without startRow and stopRow 293 Scan scan2 = new Scan(); 294 295 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2, 296 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 297 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 298 299 verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW, 300 HConstants.EMPTY_START_ROW); 301 302 } finally { 303 UTIL.getAdmin().deleteSnapshot(snapshotName); 304 UTIL.deleteTable(tableName); 305 } 306 } 307 308 @Test 309 public void testScanLimit() throws Exception { 310 final TableName tableName = TableName.valueOf(name.getMethodName()); 311 final String snapshotName = tableName + "Snapshot"; 312 Table table = null; 313 try { 314 UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10); 315 if (UTIL.getAdmin().tableExists(tableName)) { 316 UTIL.deleteTable(tableName); 317 } 318 319 UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy }); 320 321 Admin admin = UTIL.getAdmin(); 322 323 int regionNum = admin.getRegions(tableName).size(); 324 // put some stuff in the table 325 table = UTIL.getConnection().getTable(tableName); 326 UTIL.loadTable(table, FAMILIES); 327 328 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 329 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 330 331 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 332 null, snapshotName, rootDir, fs, true); 333 334 Job job = new Job(UTIL.getConfiguration()); 335 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 336 Scan scan = new Scan(); 337 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 338 TestTableSnapshotInputFormat.class); 339 340 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 341 RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true, 342 tmpTableDir); 343 Assert.assertTrue(job.waitForCompletion(true)); 344 Assert.assertEquals(10 * regionNum, 345 job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue()); 346 } finally { 347 if (table != null) { 348 table.close(); 349 } 350 UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT); 351 UTIL.getAdmin().deleteSnapshot(snapshotName); 352 UTIL.deleteTable(tableName); 353 } 354 } 355 356 @Test 357 public void testNoDuplicateResultsWhenSplitting() throws Exception { 358 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 359 String snapshotName = "testSnapshotBug"; 360 try { 361 if (UTIL.getAdmin().tableExists(tableName)) { 362 UTIL.deleteTable(tableName); 363 } 364 365 UTIL.createTable(tableName, FAMILIES); 366 Admin admin = UTIL.getAdmin(); 367 368 // put some stuff in the table 369 Table table = UTIL.getConnection().getTable(tableName); 370 UTIL.loadTable(table, FAMILIES); 371 372 // split to 2 regions 373 admin.split(tableName, Bytes.toBytes("eee")); 374 TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2); 375 376 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 377 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 378 379 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 380 null, snapshotName, rootDir, fs, true); 381 382 // load different values 383 byte[] value = Bytes.toBytes("after_snapshot_value"); 384 UTIL.loadTable(table, FAMILIES, value); 385 386 // cause flush to create new files in the region 387 admin.flush(tableName); 388 table.close(); 389 390 Job job = new Job(UTIL.getConfiguration()); 391 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 392 // limit the scan 393 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); 394 395 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 396 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 397 tmpTableDir); 398 399 verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow()); 400 } finally { 401 UTIL.getAdmin().deleteSnapshot(snapshotName); 402 UTIL.deleteTable(tableName); 403 } 404 } 405 406 @Test 407 public void testScannerReadTypeConfiguration() throws IOException { 408 Configuration conf = new Configuration(false); 409 // Explicitly set ReadTypes should persist 410 for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) { 411 Scan scanWithReadType = new Scan(); 412 scanWithReadType.setReadType(readType); 413 assertEquals(scanWithReadType.getReadType(), 414 serializeAndReturn(conf, scanWithReadType).getReadType()); 415 } 416 // We should only see the DEFAULT ReadType getting updated to STREAM. 417 Scan scanWithoutReadType = new Scan(); 418 assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); 419 assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType()); 420 421 // We should still be able to force a certain ReadType when DEFAULT is given. 422 conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD); 423 assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType()); 424 assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType()); 425 } 426 427 /** 428 * Serializes and deserializes the given scan in the same manner that TableSnapshotInputFormat 429 * does. 430 */ 431 private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException { 432 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s)); 433 return TableSnapshotInputFormatImpl.extractScanFromConf(conf); 434 } 435 436 private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, 437 byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { 438 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 439 List<InputSplit> splits = tsif.getSplits(job); 440 441 Assert.assertEquals(expectedNumSplits, splits.size()); 442 443 HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, 444 stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff")); 445 446 boolean localityEnabled = job.getConfiguration().getBoolean( 447 SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); 448 449 boolean byRegionLoc = 450 job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, 451 SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); 452 for (int i = 0; i < splits.size(); i++) { 453 // validate input split 454 InputSplit split = splits.get(i); 455 Assert.assertTrue(split instanceof TableSnapshotRegionSplit); 456 TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; 457 if (localityEnabled) { 458 Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); 459 if (byRegionLoc) { 460 // When it uses region location from meta, the hostname will be "localhost", 461 // the location from hdfs block location is "127.0.0.1". 462 Assert.assertEquals(1, split.getLocations().length); 463 Assert.assertTrue("Not using region location!", 464 split.getLocations()[0].equals("localhost")); 465 } else { 466 Assert.assertTrue("Not using region location!", 467 split.getLocations()[0].equals("127.0.0.1")); 468 } 469 } else { 470 Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); 471 } 472 473 Scan scan = 474 TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); 475 if (startRow.length > 0) { 476 Assert.assertTrue( 477 Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()), 478 Bytes.compareTo(startRow, scan.getStartRow()) <= 0); 479 } 480 if (stopRow.length > 0) { 481 Assert.assertTrue( 482 Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()), 483 Bytes.compareTo(stopRow, scan.getStopRow()) >= 0); 484 } 485 Assert.assertTrue("startRow should < stopRow", 486 Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0); 487 488 // validate record reader 489 TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); 490 when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); 491 RecordReader<ImmutableBytesWritable, Result> rr = 492 tsif.createRecordReader(split, taskAttemptContext); 493 rr.initialize(split, taskAttemptContext); 494 495 // validate we can read all the data back 496 while (rr.nextKeyValue()) { 497 byte[] row = rr.getCurrentKey().get(); 498 verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); 499 rowTracker.addRow(row); 500 } 501 502 rr.close(); 503 } 504 505 // validate all rows are seen 506 rowTracker.validate(); 507 } 508 509 @Override 510 protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, 511 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, 512 int expectedNumSplits, boolean shutdownCluster) throws Exception { 513 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 514 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 515 } 516 517 // this is also called by the IntegrationTestTableSnapshotInputFormat 518 public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, 519 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 520 int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { 521 522 LOG.info("testing with MapReduce"); 523 524 LOG.info("create the table and snapshot"); 525 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 526 527 if (shutdownCluster) { 528 LOG.info("shutting down hbase cluster."); 529 util.shutdownMiniHBaseCluster(); 530 } 531 532 try { 533 // create the job 534 Job job = new Job(util.getConfiguration()); 535 Scan scan = new Scan(startRow, endRow); // limit the scan 536 537 job.setJarByClass(util.getClass()); 538 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 539 TestTableSnapshotInputFormat.class); 540 541 if (numSplitsPerRegion > 1) { 542 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 543 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 544 true, tableDir, new RegionSplitter.UniformSplit(), numSplitsPerRegion); 545 } else { 546 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 547 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, 548 true, tableDir); 549 } 550 551 job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 552 job.setNumReduceTasks(1); 553 job.setOutputFormatClass(NullOutputFormat.class); 554 555 Assert.assertTrue(job.waitForCompletion(true)); 556 } finally { 557 if (!shutdownCluster) { 558 util.getAdmin().deleteSnapshot(snapshotName); 559 util.deleteTable(tableName); 560 } 561 } 562 } 563 564 @Test 565 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 566 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 567 } 568 569 @Test 570 public void testCleanRestoreDir() throws Exception { 571 TableName tableName = TableName.valueOf("test_table"); 572 String snapshotName = "test_snapshot"; 573 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 574 Job job = Job.getInstance(UTIL.getConfiguration()); 575 Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName); 576 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, new Scan(), 577 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 578 workingDir); 579 FileSystem fs = workingDir.getFileSystem(job.getConfiguration()); 580 Path restorePath = 581 new Path(job.getConfiguration().get("hbase.TableSnapshotInputFormat.restore.dir")); 582 Assert.assertTrue(fs.exists(restorePath)); 583 TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName); 584 Assert.assertFalse(fs.exists(restorePath)); 585 } 586}