001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; 021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.security.PrivilegedExceptionAction; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataInputStream; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionInfoBuilder; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 058import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 059import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 060import org.apache.hadoop.hbase.regionserver.HRegion; 061import org.apache.hadoop.hbase.regionserver.RegionScanner; 062import org.apache.hadoop.hbase.regionserver.RegionServerServices; 063import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay; 064import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.MediumTests; 067import org.apache.hadoop.hbase.testclassification.RegionServerTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.EnvironmentEdge; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.FSTableDescriptors; 073import org.apache.hadoop.hbase.util.Pair; 074import org.junit.After; 075import org.junit.AfterClass; 076import org.junit.Before; 077import org.junit.BeforeClass; 078import org.junit.ClassRule; 079import org.junit.Rule; 080import org.junit.Test; 081import org.junit.experimental.categories.Category; 082import org.junit.rules.TestName; 083import org.mockito.Mockito; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087@Category({ RegionServerTests.class, MediumTests.class }) 088public class TestWALSplitToHFile { 089 @ClassRule 090 public static final HBaseClassTestRule CLASS_RULE = 091 HBaseClassTestRule.forClass(TestWALSplitToHFile.class); 092 093 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 094 static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 095 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 096 private Path rootDir = null; 097 private String logName; 098 private Path oldLogDir; 099 private Path logDir; 100 private FileSystem fs; 101 private Configuration conf; 102 private WALFactory wals; 103 104 private static final byte[] ROW = Bytes.toBytes("row"); 105 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 106 private static final byte[] VALUE1 = Bytes.toBytes("value1"); 107 private static final byte[] VALUE2 = Bytes.toBytes("value2"); 108 private static final int countPerFamily = 10; 109 110 @Rule 111 public final TestName TEST_NAME = new TestName(); 112 113 @BeforeClass 114 public static void setUpBeforeClass() throws Exception { 115 Configuration conf = UTIL.getConfiguration(); 116 conf.setBoolean(WAL_SPLIT_TO_HFILE, true); 117 UTIL.startMiniCluster(3); 118 Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 119 LOG.info("hbase.rootdir=" + hbaseRootDir); 120 CommonFSUtils.setRootDir(conf, hbaseRootDir); 121 } 122 123 @AfterClass 124 public static void tearDownAfterClass() throws Exception { 125 UTIL.shutdownMiniCluster(); 126 } 127 128 @Before 129 public void setUp() throws Exception { 130 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 131 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false); 132 this.fs = UTIL.getDFSCluster().getFileSystem(); 133 this.rootDir = CommonFSUtils.getRootDir(this.conf); 134 this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 135 String serverName = ServerName 136 .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 137 .toString(); 138 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 139 this.logDir = new Path(this.rootDir, logName); 140 if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) { 141 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 142 } 143 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 144 } 145 146 @After 147 public void tearDown() throws Exception { 148 this.wals.close(); 149 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 150 } 151 152 /* 153 * @param p Directory to cleanup 154 */ 155 private void deleteDir(final Path p) throws IOException { 156 if (this.fs.exists(p)) { 157 if (!this.fs.delete(p, true)) { 158 throw new IOException("Failed remove of " + p); 159 } 160 } 161 } 162 163 private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException { 164 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 165 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build()); 166 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build()); 167 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build()); 168 TableDescriptor td = builder.build(); 169 UTIL.getAdmin().createTable(td); 170 return td; 171 } 172 173 private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { 174 FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); 175 wal.init(); 176 return wal; 177 } 178 179 private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException { 180 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf); 181 wal.init(); 182 return wal; 183 } 184 185 private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException { 186 final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 187 final TableDescriptor td = createBasic3FamilyTD(tableName); 188 final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); 189 final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName); 190 deleteDir(tableDir); 191 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); 192 HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); 193 HBaseTestingUtility.closeRegionAndWAL(region); 194 return new Pair<>(td, ri); 195 } 196 197 private void writeData(TableDescriptor td, HRegion region) throws IOException { 198 final long timestamp = this.ee.currentTime(); 199 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 200 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 201 } 202 } 203 204 @Test 205 public void testDifferentRootDirAndWALRootDir() throws Exception { 206 // Change wal root dir and reset the configuration 207 Path walRootDir = UTIL.createWALRootDir(); 208 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 209 210 FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf); 211 this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 212 String serverName = ServerName 213 .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 214 .toString(); 215 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 216 this.logDir = new Path(walRootDir, logName); 217 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 218 219 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 220 TableDescriptor td = pair.getFirst(); 221 RegionInfo ri = pair.getSecond(); 222 223 WAL wal = createWAL(walFs, walRootDir, logName); 224 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 225 writeData(td, region); 226 227 // Now close the region without flush 228 region.close(true); 229 wal.shutdown(); 230 // split the log 231 WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 232 233 WAL wal2 = createWAL(walFs, walRootDir, logName); 234 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 235 Result result2 = region2.get(new Get(ROW)); 236 assertEquals(td.getColumnFamilies().length, result2.size()); 237 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 238 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 239 } 240 } 241 242 @Test 243 public void testCorruptRecoveredHFile() throws Exception { 244 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 245 TableDescriptor td = pair.getFirst(); 246 RegionInfo ri = pair.getSecond(); 247 248 WAL wal = createWAL(this.conf, rootDir, logName); 249 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 250 writeData(td, region); 251 252 // Now close the region without flush 253 region.close(true); 254 wal.shutdown(); 255 // split the log 256 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 257 258 // Write a corrupt recovered hfile 259 Path regionDir = 260 new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName()); 261 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 262 FileStatus[] files = 263 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 264 assertNotNull(files); 265 assertTrue(files.length > 0); 266 writeCorruptRecoveredHFile(files[0].getPath()); 267 } 268 269 // Failed to reopen the region 270 WAL wal2 = createWAL(this.conf, rootDir, logName); 271 try { 272 HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 273 fail("Should fail to open region"); 274 } catch (CorruptHFileException che) { 275 // Expected 276 } 277 278 // Set skip errors to true and reopen the region 279 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true); 280 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 281 Result result2 = region2.get(new Get(ROW)); 282 assertEquals(td.getColumnFamilies().length, result2.size()); 283 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 284 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 285 // Assert the corrupt file was skipped and still exist 286 FileStatus[] files = 287 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 288 assertNotNull(files); 289 assertEquals(1, files.length); 290 assertTrue(files[0].getPath().getName().contains("corrupt")); 291 } 292 } 293 294 @Test 295 public void testPutWithSameTimestamp() throws Exception { 296 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 297 TableDescriptor td = pair.getFirst(); 298 RegionInfo ri = pair.getSecond(); 299 300 WAL wal = createWAL(this.conf, rootDir, logName); 301 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 302 final long timestamp = this.ee.currentTime(); 303 // Write data and flush 304 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 305 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 306 } 307 region.flush(true); 308 309 // Write data with same timestamp and do not flush 310 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 311 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2)); 312 } 313 // Now close the region without flush 314 region.close(true); 315 wal.shutdown(); 316 // split the log 317 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 318 319 // reopen the region 320 WAL wal2 = createWAL(this.conf, rootDir, logName); 321 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 322 Result result2 = region2.get(new Get(ROW)); 323 assertEquals(td.getColumnFamilies().length, result2.size()); 324 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 325 assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER))); 326 } 327 } 328 329 @Test 330 public void testRecoverSequenceId() throws Exception { 331 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 332 TableDescriptor td = pair.getFirst(); 333 RegionInfo ri = pair.getSecond(); 334 335 WAL wal = createWAL(this.conf, rootDir, logName); 336 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 337 Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>(); 338 // Write data and do not flush 339 for (int i = 0; i < countPerFamily; i++) { 340 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 341 region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1)); 342 Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 343 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 344 List<Cell> cells = result.listCells(); 345 assertEquals(1, cells.size()); 346 seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(), 347 cells.get(0).getSequenceId()); 348 } 349 } 350 351 // Now close the region without flush 352 region.close(true); 353 wal.shutdown(); 354 // split the log 355 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 356 357 // reopen the region 358 WAL wal2 = createWAL(this.conf, rootDir, logName); 359 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 360 // assert the seqid was recovered 361 for (int i = 0; i < countPerFamily; i++) { 362 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 363 Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 364 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 365 List<Cell> cells = result.listCells(); 366 assertEquals(1, cells.size()); 367 assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), 368 cells.get(0).getSequenceId()); 369 } 370 } 371 } 372 373 /** 374 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 375 * seqids. 376 */ 377 @Test 378 public void testWrittenViaHRegion() 379 throws IOException, SecurityException, IllegalArgumentException, InterruptedException { 380 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 381 TableDescriptor td = pair.getFirst(); 382 RegionInfo ri = pair.getSecond(); 383 384 // Write countPerFamily edits into the three families. Do a flush on one 385 // of the families during the load of edits so its seqid is not same as 386 // others to test we do right thing when different seqids. 387 WAL wal = createWAL(this.conf, rootDir, logName); 388 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 389 long seqid = region.getOpenSeqNum(); 390 boolean first = true; 391 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 392 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 393 if (first) { 394 // If first, so we have at least one family w/ different seqid to rest. 395 region.flush(true); 396 first = false; 397 } 398 } 399 // Now assert edits made it in. 400 final Get g = new Get(ROW); 401 Result result = region.get(g); 402 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 403 // Now close the region (without flush), split the log, reopen the region and assert that 404 // replay of log has the correct effect, that our seqids are calculated correctly so 405 // all edits in logs are seen as 'stale'/old. 406 region.close(true); 407 wal.shutdown(); 408 try { 409 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 410 } catch (Exception e) { 411 LOG.debug("Got exception", e); 412 } 413 414 WAL wal2 = createWAL(this.conf, rootDir, logName); 415 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 416 long seqid2 = region2.getOpenSeqNum(); 417 assertTrue(seqid + result.size() < seqid2); 418 final Result result1b = region2.get(g); 419 assertEquals(result.size(), result1b.size()); 420 421 // Next test. Add more edits, then 'crash' this region by stealing its wal 422 // out from under it and assert that replay of the log adds the edits back 423 // correctly when region is opened again. 424 for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) { 425 addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y"); 426 } 427 // Get count of edits. 428 final Result result2 = region2.get(g); 429 assertEquals(2 * result.size(), result2.size()); 430 wal2.sync(); 431 final Configuration newConf = HBaseConfiguration.create(this.conf); 432 User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString()); 433 user.runAs(new PrivilegedExceptionAction<Object>() { 434 @Override 435 public Object run() throws Exception { 436 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals); 437 FileSystem newFS = FileSystem.get(newConf); 438 // Make a new wal for new region open. 439 WAL wal3 = createWAL(newConf, rootDir, logName); 440 Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); 441 HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null); 442 long seqid3 = region3.initialize(); 443 Result result3 = region3.get(g); 444 // Assert that count of cells is same as before crash. 445 assertEquals(result2.size(), result3.size()); 446 447 // I can't close wal1. Its been appropriated when we split. 448 region3.close(); 449 wal3.close(); 450 return null; 451 } 452 }); 453 } 454 455 /** 456 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 457 * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level. 458 * The way we get around this is by flushing at the region level, and then deleting the recently 459 * flushed store file for one of the Stores. This would put us back in the situation where all but 460 * that store got flushed and the region died. We restart Region again, and verify that the edits 461 * were replayed. 462 */ 463 @Test 464 public void testAfterPartialFlush() 465 throws IOException, SecurityException, IllegalArgumentException { 466 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 467 TableDescriptor td = pair.getFirst(); 468 RegionInfo ri = pair.getSecond(); 469 470 // Write countPerFamily edits into the three families. Do a flush on one 471 // of the families during the load of edits so its seqid is not same as 472 // others to test we do right thing when different seqids. 473 WAL wal = createWAL(this.conf, rootDir, logName); 474 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 475 long seqid = region.getOpenSeqNum(); 476 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 477 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 478 } 479 480 // Now assert edits made it in. 481 final Get g = new Get(ROW); 482 Result result = region.get(g); 483 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 484 485 // Let us flush the region 486 region.flush(true); 487 region.close(true); 488 wal.shutdown(); 489 490 // delete the store files in the second column family to simulate a failure 491 // in between the flushcache(); 492 // we have 3 families. killing the middle one ensures that taking the maximum 493 // will make us fail. 494 int cf_count = 0; 495 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 496 cf_count++; 497 if (cf_count == 2) { 498 region.getRegionFileSystem().deleteFamily(cfd.getNameAsString()); 499 } 500 } 501 502 // Let us try to split and recover 503 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 504 WAL wal2 = createWAL(this.conf, rootDir, logName); 505 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 506 long seqid2 = region2.getOpenSeqNum(); 507 assertTrue(seqid + result.size() < seqid2); 508 509 final Result result1b = region2.get(g); 510 assertEquals(result.size(), result1b.size()); 511 } 512 513 /** 514 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 515 * flush after writing some data, then writing more data and flush again, at last verify the data. 516 */ 517 @Test 518 public void testAfterAbortingFlush() throws IOException { 519 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 520 TableDescriptor td = pair.getFirst(); 521 RegionInfo ri = pair.getSecond(); 522 523 // Write countPerFamily edits into the three families. Do a flush on one 524 // of the families during the load of edits so its seqid is not same as 525 // others to test we do right thing when different seqids. 526 WAL wal = createWAL(this.conf, rootDir, logName); 527 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 528 Mockito.doReturn(false).when(rsServices).isAborted(); 529 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 530 when(rsServices.getConfiguration()).thenReturn(conf); 531 Configuration customConf = new Configuration(this.conf); 532 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 533 AbstractTestWALReplay.CustomStoreFlusher.class.getName()); 534 HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null); 535 int writtenRowCount = 10; 536 List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies()); 537 for (int i = 0; i < writtenRowCount; i++) { 538 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 539 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 540 Bytes.toBytes("val")); 541 region.put(put); 542 } 543 544 // Now assert edits made it in. 545 RegionScanner scanner = region.getScanner(new Scan()); 546 assertEquals(writtenRowCount, getScannedCount(scanner)); 547 548 // Let us flush the region 549 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 550 try { 551 region.flush(true); 552 fail("Injected exception hasn't been thrown"); 553 } catch (IOException e) { 554 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 555 // simulated to abort server 556 Mockito.doReturn(true).when(rsServices).isAborted(); 557 region.setClosing(false); // region normally does not accept writes after 558 // DroppedSnapshotException. We mock around it for this test. 559 } 560 // writing more data 561 int moreRow = 10; 562 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 563 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 564 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 565 Bytes.toBytes("val")); 566 region.put(put); 567 } 568 writtenRowCount += moreRow; 569 // call flush again 570 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 571 try { 572 region.flush(true); 573 } catch (IOException t) { 574 LOG.info( 575 "Expected exception when flushing region because server is stopped," + t.getMessage()); 576 } 577 578 region.close(true); 579 wal.shutdown(); 580 581 // Let us try to split and recover 582 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 583 WAL wal2 = createWAL(this.conf, rootDir, logName); 584 Mockito.doReturn(false).when(rsServices).isAborted(); 585 HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null); 586 scanner = region2.getScanner(new Scan()); 587 assertEquals(writtenRowCount, getScannedCount(scanner)); 588 } 589 590 private int getScannedCount(RegionScanner scanner) throws IOException { 591 int scannedCount = 0; 592 List<Cell> results = new ArrayList<>(); 593 while (true) { 594 boolean existMore = scanner.next(results); 595 if (!results.isEmpty()) { 596 scannedCount++; 597 } 598 if (!existMore) { 599 break; 600 } 601 results.clear(); 602 } 603 return scannedCount; 604 } 605 606 private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception { 607 // Read the recovered hfile 608 int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen(); 609 FSDataInputStream in = fs.open(recoveredHFile); 610 byte[] fileContent = new byte[fileSize]; 611 in.readFully(0, fileContent, 0, fileSize); 612 in.close(); 613 614 // Write a corrupt hfile by append garbage 615 Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt"); 616 FSDataOutputStream out; 617 out = fs.create(path); 618 out.write(fileContent); 619 out.write(Bytes.toBytes("-----")); 620 out.close(); 621 } 622}