001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT; 021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.HDFSBlocksDistribution; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; 041import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 042import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; 043import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.FSUtils; 048import org.apache.hadoop.hbase.util.RegionSplitter; 049import org.apache.hadoop.io.NullWritable; 050import org.apache.hadoop.mapreduce.InputSplit; 051import org.apache.hadoop.mapreduce.Job; 052import org.apache.hadoop.mapreduce.RecordReader; 053import org.apache.hadoop.mapreduce.Reducer; 054import org.apache.hadoop.mapreduce.TaskAttemptContext; 055import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 056import org.junit.After; 057import org.junit.Assert; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 067 068@Category({VerySlowMapReduceTests.class, LargeTests.class}) 069public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class); 076 077 private static final byte[] bbb = Bytes.toBytes("bbb"); 078 private static final byte[] yyy = Bytes.toBytes("yyy"); 079 private static final byte[] bbc = Bytes.toBytes("bbc"); 080 private static final byte[] yya = Bytes.toBytes("yya"); 081 082 @Rule 083 public TestName name = new TestName(); 084 085 @Override 086 protected byte[] getStartRow() { 087 return bbb; 088 } 089 090 @Override 091 protected byte[] getEndRow() { 092 return yyy; 093 } 094 095 @After 096 public void tearDown() throws Exception { 097 } 098 099 @Test 100 public void testGetBestLocations() throws IOException { 101 TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl(); 102 Configuration conf = UTIL.getConfiguration(); 103 104 HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); 105 Assert.assertEquals(null, 106 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 107 108 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); 109 Assert.assertEquals(Lists.newArrayList("h1"), 110 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 111 112 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); 113 Assert.assertEquals(Lists.newArrayList("h1"), 114 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 115 116 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1); 117 Assert.assertEquals(Lists.newArrayList("h1"), 118 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 119 120 blockDistribution = new HDFSBlocksDistribution(); 121 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10); 122 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7); 123 blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5); 124 blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1); 125 Assert.assertEquals(Lists.newArrayList("h1"), 126 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 127 128 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2); 129 Assert.assertEquals(Lists.newArrayList("h1", "h2"), 130 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 131 132 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3); 133 Assert.assertEquals(Lists.newArrayList("h2", "h1"), 134 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 135 136 blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6); 137 blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9); 138 139 Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"), 140 TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); 141 } 142 143 public static enum TestTableSnapshotCounters { 144 VALIDATION_ERROR 145 } 146 147 public static class TestTableSnapshotMapper 148 extends TableMapper<ImmutableBytesWritable, NullWritable> { 149 @Override 150 protected void map(ImmutableBytesWritable key, Result value, 151 Context context) throws IOException, InterruptedException { 152 // Validate a single row coming from the snapshot, and emit the row key 153 verifyRowFromMap(key, value); 154 context.write(key, NullWritable.get()); 155 } 156 } 157 158 public static class TestTableSnapshotReducer 159 extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { 160 HBaseTestingUtility.SeenRowTracker rowTracker = 161 new HBaseTestingUtility.SeenRowTracker(bbb, yyy); 162 @Override 163 protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values, 164 Context context) throws IOException, InterruptedException { 165 rowTracker.addRow(key.get()); 166 } 167 168 @Override 169 protected void cleanup(Context context) throws IOException, 170 InterruptedException { 171 rowTracker.validate(); 172 } 173 } 174 175 @Test 176 public void testInitTableSnapshotMapperJobConfig() throws Exception { 177 setupCluster(); 178 final TableName tableName = TableName.valueOf(name.getMethodName()); 179 String snapshotName = "foo"; 180 181 try { 182 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); 183 Job job = new Job(UTIL.getConfiguration()); 184 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 185 186 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 187 new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 188 NullWritable.class, job, false, tmpTableDir); 189 190 // TODO: would be better to examine directly the cache instance that results from this 191 // config. Currently this is not possible because BlockCache initialization is static. 192 Assert.assertEquals( 193 "Snapshot job should be configured for default LruBlockCache.", 194 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, 195 job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); 196 Assert.assertEquals( 197 "Snapshot job should not use BucketCache.", 198 0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01); 199 } finally { 200 UTIL.getAdmin().deleteSnapshot(snapshotName); 201 UTIL.deleteTable(tableName); 202 tearDownCluster(); 203 } 204 } 205 206 @Override 207 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, 208 String snapshotName, Path tmpTableDir) throws Exception { 209 Job job = new Job(UTIL.getConfiguration()); 210 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 211 new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 212 NullWritable.class, job, false, tmpTableDir); 213 } 214 215 @Override 216 public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, 217 int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo) 218 throws Exception { 219 setupCluster(); 220 final TableName tableName = TableName.valueOf(name.getMethodName()); 221 try { 222 createTableAndSnapshot( 223 util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); 224 225 Configuration conf = util.getConfiguration(); 226 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo); 227 Job job = new Job(conf); 228 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); 229 Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan 230 231 if (numSplitsPerRegion > 1) { 232 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 233 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 234 NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(), 235 numSplitsPerRegion); 236 } else { 237 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 238 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 239 NullWritable.class, job, false, tmpTableDir); 240 } 241 242 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); 243 244 } finally { 245 util.getAdmin().deleteSnapshot(snapshotName); 246 util.deleteTable(tableName); 247 tearDownCluster(); 248 } 249 } 250 251 @Test 252 public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception { 253 setupCluster(); 254 String snapshotName = "testWithMockedMapReduceMultiRegion"; 255 final TableName tableName = TableName.valueOf(name.getMethodName()); 256 try { 257 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 258 259 Configuration conf = UTIL.getConfiguration(); 260 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 261 Job job = new Job(conf); 262 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 263 // test scan with startRow and stopRow 264 Scan scan = new Scan(bbc, yya); 265 266 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 267 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 268 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 269 270 verifyWithMockedMapReduce(job, 10, 40, bbc, yya); 271 } finally { 272 UTIL.getAdmin().deleteSnapshot(snapshotName); 273 UTIL.deleteTable(tableName); 274 tearDownCluster(); 275 } 276 } 277 278 @Test 279 public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception { 280 setupCluster(); 281 String snapshotName = "testWithMockedMapReduceMultiRegion"; 282 final TableName tableName = TableName.valueOf(name.getMethodName()); 283 try { 284 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10); 285 286 Configuration conf = UTIL.getConfiguration(); 287 conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false); 288 Job job = new Job(conf); 289 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 290 // test scan without startRow and stopRow 291 Scan scan2 = new Scan(); 292 293 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2, 294 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 295 tmpTableDir, new RegionSplitter.UniformSplit(), 5); 296 297 verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW, 298 HConstants.EMPTY_START_ROW); 299 300 } finally { 301 UTIL.getAdmin().deleteSnapshot(snapshotName); 302 UTIL.deleteTable(tableName); 303 tearDownCluster(); 304 } 305 } 306 307 @Test 308 public void testNoDuplicateResultsWhenSplitting() throws Exception { 309 setupCluster(); 310 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 311 String snapshotName = "testSnapshotBug"; 312 try { 313 if (UTIL.getAdmin().tableExists(tableName)) { 314 UTIL.deleteTable(tableName); 315 } 316 317 UTIL.createTable(tableName, FAMILIES); 318 Admin admin = UTIL.getAdmin(); 319 320 // put some stuff in the table 321 Table table = UTIL.getConnection().getTable(tableName); 322 UTIL.loadTable(table, FAMILIES); 323 324 // split to 2 regions 325 admin.split(tableName, Bytes.toBytes("eee")); 326 TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2); 327 328 Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration()); 329 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 330 331 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 332 null, snapshotName, rootDir, fs, true); 333 334 // load different values 335 byte[] value = Bytes.toBytes("after_snapshot_value"); 336 UTIL.loadTable(table, FAMILIES, value); 337 338 // cause flush to create new files in the region 339 admin.flush(tableName); 340 table.close(); 341 342 Job job = new Job(UTIL.getConfiguration()); 343 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); 344 // limit the scan 345 Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); 346 347 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, 348 TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, 349 tmpTableDir); 350 351 verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow()); 352 } finally { 353 UTIL.getAdmin().deleteSnapshot(snapshotName); 354 UTIL.deleteTable(tableName); 355 tearDownCluster(); 356 } 357 } 358 359 private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, 360 byte[] startRow, byte[] stopRow) 361 throws IOException, InterruptedException { 362 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); 363 List<InputSplit> splits = tsif.getSplits(job); 364 365 Assert.assertEquals(expectedNumSplits, splits.size()); 366 367 HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, 368 stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff")); 369 370 boolean localityEnabled = 371 job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, 372 SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); 373 374 for (int i = 0; i < splits.size(); i++) { 375 // validate input split 376 InputSplit split = splits.get(i); 377 Assert.assertTrue(split instanceof TableSnapshotRegionSplit); 378 TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split; 379 if (localityEnabled) { 380 Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0); 381 } else { 382 Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0); 383 } 384 385 Scan scan = 386 TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan()); 387 if (startRow.length > 0) { 388 Assert.assertTrue( 389 Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()), 390 Bytes.compareTo(startRow, scan.getStartRow()) <= 0); 391 } 392 if (stopRow.length > 0) { 393 Assert.assertTrue( 394 Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()), 395 Bytes.compareTo(stopRow, scan.getStopRow()) >= 0); 396 } 397 Assert.assertTrue("startRow should < stopRow", 398 Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0); 399 400 // validate record reader 401 TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); 402 when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); 403 RecordReader<ImmutableBytesWritable, Result> rr = 404 tsif.createRecordReader(split, taskAttemptContext); 405 rr.initialize(split, taskAttemptContext); 406 407 // validate we can read all the data back 408 while (rr.nextKeyValue()) { 409 byte[] row = rr.getCurrentKey().get(); 410 verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); 411 rowTracker.addRow(row); 412 } 413 414 rr.close(); 415 } 416 417 // validate all rows are seen 418 rowTracker.validate(); 419 } 420 421 @Override 422 protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, 423 String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, 424 int expectedNumSplits, boolean shutdownCluster) throws Exception { 425 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, 426 numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster); 427 } 428 429 // this is also called by the IntegrationTestTableSnapshotInputFormat 430 public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, 431 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, 432 int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception { 433 434 LOG.info("testing with MapReduce"); 435 436 LOG.info("create the table and snapshot"); 437 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); 438 439 if (shutdownCluster) { 440 LOG.info("shutting down hbase cluster."); 441 util.shutdownMiniHBaseCluster(); 442 } 443 444 try { 445 // create the job 446 Job job = new Job(util.getConfiguration()); 447 Scan scan = new Scan(startRow, endRow); // limit the scan 448 449 job.setJarByClass(util.getClass()); 450 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 451 TestTableSnapshotInputFormat.class); 452 453 if (numSplitsPerRegion > 1) { 454 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 455 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 456 NullWritable.class, job, true, tableDir, new RegionSplitter.UniformSplit(), 457 numSplitsPerRegion); 458 } else { 459 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, 460 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, 461 NullWritable.class, job, true, tableDir); 462 } 463 464 job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); 465 job.setNumReduceTasks(1); 466 job.setOutputFormatClass(NullOutputFormat.class); 467 468 Assert.assertTrue(job.waitForCompletion(true)); 469 } finally { 470 if (!shutdownCluster) { 471 util.getAdmin().deleteSnapshot(snapshotName); 472 util.deleteTable(tableName); 473 } 474 } 475 } 476 477 @Test 478 public void testWithMapReduceMultipleMappersPerRegion() throws Exception { 479 testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false); 480 } 481}