001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; 021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.Mockito.when; 027import java.io.IOException; 028import java.security.PrivilegedExceptionAction; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.HashMap; 032import java.util.List; 033import java.util.Map; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataInputStream; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.Get; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.client.RegionInfoBuilder; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 057import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 058import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 059import org.apache.hadoop.hbase.regionserver.HRegion; 060import org.apache.hadoop.hbase.regionserver.RegionScanner; 061import org.apache.hadoop.hbase.regionserver.RegionServerServices; 062import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay; 063import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 064import org.apache.hadoop.hbase.security.User; 065import org.apache.hadoop.hbase.testclassification.MediumTests; 066import org.apache.hadoop.hbase.testclassification.RegionServerTests; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.CommonFSUtils; 069import org.apache.hadoop.hbase.util.EnvironmentEdge; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.FSTableDescriptors; 072import org.apache.hadoop.hbase.util.Pair; 073import org.junit.After; 074import org.junit.AfterClass; 075import org.junit.Before; 076import org.junit.BeforeClass; 077import org.junit.ClassRule; 078import org.junit.Rule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.junit.rules.TestName; 082import org.mockito.Mockito; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086@Category({ RegionServerTests.class, MediumTests.class }) 087public class TestWALSplitToHFile { 088 @ClassRule 089 public static final HBaseClassTestRule CLASS_RULE = 090 HBaseClassTestRule.forClass(TestWALSplitToHFile.class); 091 092 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 093 static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 094 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 095 private Path rootDir = null; 096 private String logName; 097 private Path oldLogDir; 098 private Path logDir; 099 private FileSystem fs; 100 private Configuration conf; 101 private WALFactory wals; 102 103 private static final byte[] ROW = Bytes.toBytes("row"); 104 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 105 private static final byte[] VALUE1 = Bytes.toBytes("value1"); 106 private static final byte[] VALUE2 = Bytes.toBytes("value2"); 107 private static final int countPerFamily = 10; 108 109 @Rule 110 public final TestName TEST_NAME = new TestName(); 111 112 @BeforeClass 113 public static void setUpBeforeClass() throws Exception { 114 Configuration conf = UTIL.getConfiguration(); 115 conf.setBoolean(WAL_SPLIT_TO_HFILE, true); 116 UTIL.startMiniCluster(3); 117 Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 118 LOG.info("hbase.rootdir=" + hbaseRootDir); 119 CommonFSUtils.setRootDir(conf, hbaseRootDir); 120 } 121 122 @AfterClass 123 public static void tearDownAfterClass() throws Exception { 124 UTIL.shutdownMiniCluster(); 125 } 126 127 @Before 128 public void setUp() throws Exception { 129 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 130 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false); 131 this.fs = UTIL.getDFSCluster().getFileSystem(); 132 this.rootDir = CommonFSUtils.getRootDir(this.conf); 133 this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 134 String serverName = 135 ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis()) 136 .toString(); 137 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 138 this.logDir = new Path(this.rootDir, logName); 139 if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) { 140 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 141 } 142 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 143 } 144 145 @After 146 public void tearDown() throws Exception { 147 this.wals.close(); 148 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 149 } 150 151 /* 152 * @param p Directory to cleanup 153 */ 154 private void deleteDir(final Path p) throws IOException { 155 if (this.fs.exists(p)) { 156 if (!this.fs.delete(p, true)) { 157 throw new IOException("Failed remove of " + p); 158 } 159 } 160 } 161 162 private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException { 163 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 164 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build()); 165 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build()); 166 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build()); 167 TableDescriptor td = builder.build(); 168 UTIL.getAdmin().createTable(td); 169 return td; 170 } 171 172 private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { 173 FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); 174 wal.init(); 175 return wal; 176 } 177 178 private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException { 179 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf); 180 wal.init(); 181 return wal; 182 } 183 184 private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException { 185 final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 186 final TableDescriptor td = createBasic3FamilyTD(tableName); 187 final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); 188 final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName); 189 deleteDir(tableDir); 190 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); 191 HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); 192 HBaseTestingUtility.closeRegionAndWAL(region); 193 return new Pair<>(td, ri); 194 } 195 196 private void writeData(TableDescriptor td, HRegion region) throws IOException { 197 final long timestamp = this.ee.currentTime(); 198 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 199 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 200 } 201 } 202 203 @Test 204 public void testDifferentRootDirAndWALRootDir() throws Exception { 205 // Change wal root dir and reset the configuration 206 Path walRootDir = UTIL.createWALRootDir(); 207 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 208 209 FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf); 210 this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 211 String serverName = 212 ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis()) 213 .toString(); 214 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 215 this.logDir = new Path(walRootDir, logName); 216 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 217 218 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 219 TableDescriptor td = pair.getFirst(); 220 RegionInfo ri = pair.getSecond(); 221 222 WAL wal = createWAL(walFs, walRootDir, logName); 223 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 224 writeData(td, region); 225 226 // Now close the region without flush 227 region.close(true); 228 wal.shutdown(); 229 // split the log 230 WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 231 232 WAL wal2 = createWAL(walFs, walRootDir, logName); 233 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 234 Result result2 = region2.get(new Get(ROW)); 235 assertEquals(td.getColumnFamilies().length, result2.size()); 236 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 237 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 238 } 239 } 240 241 @Test 242 public void testCorruptRecoveredHFile() throws Exception { 243 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 244 TableDescriptor td = pair.getFirst(); 245 RegionInfo ri = pair.getSecond(); 246 247 WAL wal = createWAL(this.conf, rootDir, logName); 248 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 249 writeData(td, region); 250 251 // Now close the region without flush 252 region.close(true); 253 wal.shutdown(); 254 // split the log 255 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 256 257 // Write a corrupt recovered hfile 258 Path regionDir = 259 new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName()); 260 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 261 FileStatus[] files = 262 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 263 assertNotNull(files); 264 assertTrue(files.length > 0); 265 writeCorruptRecoveredHFile(files[0].getPath()); 266 } 267 268 // Failed to reopen the region 269 WAL wal2 = createWAL(this.conf, rootDir, logName); 270 try { 271 HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 272 fail("Should fail to open region"); 273 } catch (CorruptHFileException che) { 274 // Expected 275 } 276 277 // Set skip errors to true and reopen the region 278 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true); 279 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 280 Result result2 = region2.get(new Get(ROW)); 281 assertEquals(td.getColumnFamilies().length, result2.size()); 282 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 283 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 284 // Assert the corrupt file was skipped and still exist 285 FileStatus[] files = 286 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 287 assertNotNull(files); 288 assertEquals(1, files.length); 289 assertTrue(files[0].getPath().getName().contains("corrupt")); 290 } 291 } 292 293 @Test 294 public void testPutWithSameTimestamp() throws Exception { 295 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 296 TableDescriptor td = pair.getFirst(); 297 RegionInfo ri = pair.getSecond(); 298 299 WAL wal = createWAL(this.conf, rootDir, logName); 300 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 301 final long timestamp = this.ee.currentTime(); 302 // Write data and flush 303 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 304 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 305 } 306 region.flush(true); 307 308 // Write data with same timestamp and do not flush 309 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 310 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2)); 311 } 312 // Now close the region without flush 313 region.close(true); 314 wal.shutdown(); 315 // split the log 316 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 317 318 // reopen the region 319 WAL wal2 = createWAL(this.conf, rootDir, logName); 320 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 321 Result result2 = region2.get(new Get(ROW)); 322 assertEquals(td.getColumnFamilies().length, result2.size()); 323 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 324 assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER))); 325 } 326 } 327 328 @Test 329 public void testRecoverSequenceId() throws Exception { 330 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 331 TableDescriptor td = pair.getFirst(); 332 RegionInfo ri = pair.getSecond(); 333 334 WAL wal = createWAL(this.conf, rootDir, logName); 335 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 336 Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>(); 337 // Write data and do not flush 338 for (int i = 0; i < countPerFamily; i++) { 339 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 340 region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1)); 341 Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 342 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 343 List<Cell> cells = result.listCells(); 344 assertEquals(1, cells.size()); 345 seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(), 346 cells.get(0).getSequenceId()); 347 } 348 } 349 350 // Now close the region without flush 351 region.close(true); 352 wal.shutdown(); 353 // split the log 354 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 355 356 // reopen the region 357 WAL wal2 = createWAL(this.conf, rootDir, logName); 358 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 359 // assert the seqid was recovered 360 for (int i = 0; i < countPerFamily; i++) { 361 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 362 Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 363 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 364 List<Cell> cells = result.listCells(); 365 assertEquals(1, cells.size()); 366 assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), 367 cells.get(0).getSequenceId()); 368 } 369 } 370 } 371 372 /** 373 * Test writing edits into an HRegion, closing it, splitting logs, opening 374 * Region again. Verify seqids. 375 */ 376 @Test 377 public void testWrittenViaHRegion() 378 throws IOException, SecurityException, IllegalArgumentException, InterruptedException { 379 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 380 TableDescriptor td = pair.getFirst(); 381 RegionInfo ri = pair.getSecond(); 382 383 // Write countPerFamily edits into the three families. Do a flush on one 384 // of the families during the load of edits so its seqid is not same as 385 // others to test we do right thing when different seqids. 386 WAL wal = createWAL(this.conf, rootDir, logName); 387 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 388 long seqid = region.getOpenSeqNum(); 389 boolean first = true; 390 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 391 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 392 if (first) { 393 // If first, so we have at least one family w/ different seqid to rest. 394 region.flush(true); 395 first = false; 396 } 397 } 398 // Now assert edits made it in. 399 final Get g = new Get(ROW); 400 Result result = region.get(g); 401 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 402 // Now close the region (without flush), split the log, reopen the region and assert that 403 // replay of log has the correct effect, that our seqids are calculated correctly so 404 // all edits in logs are seen as 'stale'/old. 405 region.close(true); 406 wal.shutdown(); 407 try { 408 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 409 } catch (Exception e) { 410 LOG.debug("Got exception", e); 411 } 412 413 WAL wal2 = createWAL(this.conf, rootDir, logName); 414 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 415 long seqid2 = region2.getOpenSeqNum(); 416 assertTrue(seqid + result.size() < seqid2); 417 final Result result1b = region2.get(g); 418 assertEquals(result.size(), result1b.size()); 419 420 // Next test. Add more edits, then 'crash' this region by stealing its wal 421 // out from under it and assert that replay of the log adds the edits back 422 // correctly when region is opened again. 423 for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) { 424 addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y"); 425 } 426 // Get count of edits. 427 final Result result2 = region2.get(g); 428 assertEquals(2 * result.size(), result2.size()); 429 wal2.sync(); 430 final Configuration newConf = HBaseConfiguration.create(this.conf); 431 User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString()); 432 user.runAs(new PrivilegedExceptionAction<Object>() { 433 @Override 434 public Object run() throws Exception { 435 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals); 436 FileSystem newFS = FileSystem.get(newConf); 437 // Make a new wal for new region open. 438 WAL wal3 = createWAL(newConf, rootDir, logName); 439 Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); 440 HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null); 441 long seqid3 = region3.initialize(); 442 Result result3 = region3.get(g); 443 // Assert that count of cells is same as before crash. 444 assertEquals(result2.size(), result3.size()); 445 446 // I can't close wal1. Its been appropriated when we split. 447 region3.close(); 448 wal3.close(); 449 return null; 450 } 451 }); 452 } 453 454 /** 455 * Test that we recover correctly when there is a failure in between the 456 * flushes. i.e. Some stores got flushed but others did not. 457 * Unfortunately, there is no easy hook to flush at a store level. The way 458 * we get around this is by flushing at the region level, and then deleting 459 * the recently flushed store file for one of the Stores. This would put us 460 * back in the situation where all but that store got flushed and the region 461 * died. 462 * We restart Region again, and verify that the edits were replayed. 463 */ 464 @Test 465 public void testAfterPartialFlush() 466 throws IOException, SecurityException, IllegalArgumentException { 467 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 468 TableDescriptor td = pair.getFirst(); 469 RegionInfo ri = pair.getSecond(); 470 471 // Write countPerFamily edits into the three families. Do a flush on one 472 // of the families during the load of edits so its seqid is not same as 473 // others to test we do right thing when different seqids. 474 WAL wal = createWAL(this.conf, rootDir, logName); 475 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 476 long seqid = region.getOpenSeqNum(); 477 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 478 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 479 } 480 481 // Now assert edits made it in. 482 final Get g = new Get(ROW); 483 Result result = region.get(g); 484 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 485 486 // Let us flush the region 487 region.flush(true); 488 region.close(true); 489 wal.shutdown(); 490 491 // delete the store files in the second column family to simulate a failure 492 // in between the flushcache(); 493 // we have 3 families. killing the middle one ensures that taking the maximum 494 // will make us fail. 495 int cf_count = 0; 496 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 497 cf_count++; 498 if (cf_count == 2) { 499 region.getRegionFileSystem().deleteFamily(cfd.getNameAsString()); 500 } 501 } 502 503 // Let us try to split and recover 504 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 505 WAL wal2 = createWAL(this.conf, rootDir, logName); 506 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 507 long seqid2 = region2.getOpenSeqNum(); 508 assertTrue(seqid + result.size() < seqid2); 509 510 final Result result1b = region2.get(g); 511 assertEquals(result.size(), result1b.size()); 512 } 513 514 /** 515 * Test that we could recover the data correctly after aborting flush. In the 516 * test, first we abort flush after writing some data, then writing more data 517 * and flush again, at last verify the data. 518 */ 519 @Test 520 public void testAfterAbortingFlush() throws IOException { 521 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 522 TableDescriptor td = pair.getFirst(); 523 RegionInfo ri = pair.getSecond(); 524 525 // Write countPerFamily edits into the three families. Do a flush on one 526 // of the families during the load of edits so its seqid is not same as 527 // others to test we do right thing when different seqids. 528 WAL wal = createWAL(this.conf, rootDir, logName); 529 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 530 Mockito.doReturn(false).when(rsServices).isAborted(); 531 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 532 when(rsServices.getConfiguration()).thenReturn(conf); 533 Configuration customConf = new Configuration(this.conf); 534 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 535 AbstractTestWALReplay.CustomStoreFlusher.class.getName()); 536 HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null); 537 int writtenRowCount = 10; 538 List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies()); 539 for (int i = 0; i < writtenRowCount; i++) { 540 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 541 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 542 Bytes.toBytes("val")); 543 region.put(put); 544 } 545 546 // Now assert edits made it in. 547 RegionScanner scanner = region.getScanner(new Scan()); 548 assertEquals(writtenRowCount, getScannedCount(scanner)); 549 550 // Let us flush the region 551 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 552 try { 553 region.flush(true); 554 fail("Injected exception hasn't been thrown"); 555 } catch (IOException e) { 556 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 557 // simulated to abort server 558 Mockito.doReturn(true).when(rsServices).isAborted(); 559 region.setClosing(false); // region normally does not accept writes after 560 // DroppedSnapshotException. We mock around it for this test. 561 } 562 // writing more data 563 int moreRow = 10; 564 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 565 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 566 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 567 Bytes.toBytes("val")); 568 region.put(put); 569 } 570 writtenRowCount += moreRow; 571 // call flush again 572 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 573 try { 574 region.flush(true); 575 } catch (IOException t) { 576 LOG.info( 577 "Expected exception when flushing region because server is stopped," + t.getMessage()); 578 } 579 580 region.close(true); 581 wal.shutdown(); 582 583 // Let us try to split and recover 584 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 585 WAL wal2 = createWAL(this.conf, rootDir, logName); 586 Mockito.doReturn(false).when(rsServices).isAborted(); 587 HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null); 588 scanner = region2.getScanner(new Scan()); 589 assertEquals(writtenRowCount, getScannedCount(scanner)); 590 } 591 592 private int getScannedCount(RegionScanner scanner) throws IOException { 593 int scannedCount = 0; 594 List<Cell> results = new ArrayList<>(); 595 while (true) { 596 boolean existMore = scanner.next(results); 597 if (!results.isEmpty()) { 598 scannedCount++; 599 } 600 if (!existMore) { 601 break; 602 } 603 results.clear(); 604 } 605 return scannedCount; 606 } 607 608 private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception { 609 // Read the recovered hfile 610 int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen(); 611 FSDataInputStream in = fs.open(recoveredHFile); 612 byte[] fileContent = new byte[fileSize]; 613 in.readFully(0, fileContent, 0, fileSize); 614 in.close(); 615 616 // Write a corrupt hfile by append garbage 617 Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt"); 618 FSDataOutputStream out; 619 out = fs.create(path); 620 out.write(fileContent); 621 out.write(Bytes.toBytes("-----")); 622 out.close(); 623 } 624}