001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; 021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.security.PrivilegedExceptionAction; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataInputStream; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.ExtendedCell; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ClientInternalHelper; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Put; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.client.RegionInfoBuilder; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.TableDescriptor; 059import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 060import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 061import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 062import org.apache.hadoop.hbase.regionserver.HRegion; 063import org.apache.hadoop.hbase.regionserver.RegionScanner; 064import org.apache.hadoop.hbase.regionserver.RegionServerServices; 065import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay; 066import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 067import org.apache.hadoop.hbase.security.User; 068import org.apache.hadoop.hbase.testclassification.MediumTests; 069import org.apache.hadoop.hbase.testclassification.RegionServerTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.CommonFSUtils; 072import org.apache.hadoop.hbase.util.EnvironmentEdge; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.FSTableDescriptors; 075import org.apache.hadoop.hbase.util.Pair; 076import org.junit.After; 077import org.junit.AfterClass; 078import org.junit.Before; 079import org.junit.BeforeClass; 080import org.junit.ClassRule; 081import org.junit.Rule; 082import org.junit.Test; 083import org.junit.experimental.categories.Category; 084import org.junit.rules.TestName; 085import org.mockito.Mockito; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089@Category({ RegionServerTests.class, MediumTests.class }) 090public class TestWALSplitToHFile { 091 @ClassRule 092 public static final HBaseClassTestRule CLASS_RULE = 093 HBaseClassTestRule.forClass(TestWALSplitToHFile.class); 094 095 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 096 static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 097 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 098 private Path rootDir = null; 099 private String logName; 100 private Path oldLogDir; 101 private Path logDir; 102 private FileSystem fs; 103 private Configuration conf; 104 private WALFactory wals; 105 106 private static final byte[] ROW = Bytes.toBytes("row"); 107 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 108 private static final byte[] VALUE1 = Bytes.toBytes("value1"); 109 private static final byte[] VALUE2 = Bytes.toBytes("value2"); 110 private static final int countPerFamily = 10; 111 112 @Rule 113 public final TestName TEST_NAME = new TestName(); 114 115 @BeforeClass 116 public static void setUpBeforeClass() throws Exception { 117 Configuration conf = UTIL.getConfiguration(); 118 conf.setBoolean(WAL_SPLIT_TO_HFILE, true); 119 UTIL.startMiniCluster(3); 120 Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 121 LOG.info("hbase.rootdir=" + hbaseRootDir); 122 CommonFSUtils.setRootDir(conf, hbaseRootDir); 123 } 124 125 @AfterClass 126 public static void tearDownAfterClass() throws Exception { 127 UTIL.shutdownMiniCluster(); 128 } 129 130 @Before 131 public void setUp() throws Exception { 132 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 133 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false); 134 this.fs = UTIL.getDFSCluster().getFileSystem(); 135 this.rootDir = CommonFSUtils.getRootDir(this.conf); 136 this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 137 String serverName = ServerName 138 .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 139 .toString(); 140 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 141 this.logDir = new Path(this.rootDir, logName); 142 if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) { 143 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 144 } 145 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 146 } 147 148 @After 149 public void tearDown() throws Exception { 150 this.wals.close(); 151 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 152 } 153 154 /* 155 * @param p Directory to cleanup 156 */ 157 private void deleteDir(final Path p) throws IOException { 158 if (this.fs.exists(p)) { 159 if (!this.fs.delete(p, true)) { 160 throw new IOException("Failed remove of " + p); 161 } 162 } 163 } 164 165 private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException { 166 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 167 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build()); 168 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build()); 169 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build()); 170 TableDescriptor td = builder.build(); 171 UTIL.getAdmin().createTable(td); 172 return td; 173 } 174 175 private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { 176 FileSystem fs = hbaseRootDir.getFileSystem(c); 177 fs.mkdirs(new Path(hbaseRootDir, logName)); 178 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, c); 179 wal.init(); 180 return wal; 181 } 182 183 private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException { 184 fs.mkdirs(new Path(hbaseRootDir, logName)); 185 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf); 186 wal.init(); 187 return wal; 188 } 189 190 private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException { 191 final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); 192 final TableDescriptor td = createBasic3FamilyTD(tableName); 193 final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); 194 final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName); 195 deleteDir(tableDir); 196 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); 197 HRegion region = HBaseTestingUtil.createRegionAndWAL(ri, rootDir, this.conf, td); 198 HBaseTestingUtil.closeRegionAndWAL(region); 199 return new Pair<>(td, ri); 200 } 201 202 private void writeData(TableDescriptor td, HRegion region) throws IOException { 203 final long timestamp = this.ee.currentTime(); 204 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 205 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 206 } 207 } 208 209 @Test 210 public void testDifferentRootDirAndWALRootDir() throws Exception { 211 // Change wal root dir and reset the configuration 212 Path walRootDir = UTIL.createWALRootDir(); 213 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 214 215 FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf); 216 this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 217 String serverName = ServerName 218 .valueOf(TEST_NAME.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 219 .toString(); 220 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 221 this.logDir = new Path(walRootDir, logName); 222 this.wals = new WALFactory(conf, TEST_NAME.getMethodName()); 223 224 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 225 TableDescriptor td = pair.getFirst(); 226 RegionInfo ri = pair.getSecond(); 227 228 WAL wal = createWAL(walFs, walRootDir, logName); 229 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 230 writeData(td, region); 231 232 // Now close the region without flush 233 region.close(true); 234 wal.shutdown(); 235 // split the log 236 WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 237 238 WAL wal2 = createWAL(walFs, walRootDir, logName); 239 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 240 Result result2 = region2.get(new Get(ROW)); 241 assertEquals(td.getColumnFamilies().length, result2.size()); 242 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 243 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 244 } 245 } 246 247 @Test 248 public void testCorruptRecoveredHFile() throws Exception { 249 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 250 TableDescriptor td = pair.getFirst(); 251 RegionInfo ri = pair.getSecond(); 252 253 WAL wal = createWAL(this.conf, rootDir, logName); 254 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 255 writeData(td, region); 256 257 // Now close the region without flush 258 region.close(true); 259 wal.shutdown(); 260 // split the log 261 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 262 263 // Write a corrupt recovered hfile 264 Path regionDir = 265 new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName()); 266 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 267 FileStatus[] files = 268 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 269 assertNotNull(files); 270 assertTrue(files.length > 0); 271 writeCorruptRecoveredHFile(files[0].getPath()); 272 } 273 274 // Failed to reopen the region 275 WAL wal2 = createWAL(this.conf, rootDir, logName); 276 try { 277 HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 278 fail("Should fail to open region"); 279 } catch (CorruptHFileException che) { 280 // Expected 281 } 282 283 // Set skip errors to true and reopen the region 284 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true); 285 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 286 Result result2 = region2.get(new Get(ROW)); 287 assertEquals(td.getColumnFamilies().length, result2.size()); 288 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 289 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 290 // Assert the corrupt file was skipped and still exist 291 FileStatus[] files = 292 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 293 assertNotNull(files); 294 assertEquals(1, files.length); 295 assertTrue(files[0].getPath().getName().contains("corrupt")); 296 } 297 } 298 299 @Test 300 public void testPutWithSameTimestamp() throws Exception { 301 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 302 TableDescriptor td = pair.getFirst(); 303 RegionInfo ri = pair.getSecond(); 304 305 WAL wal = createWAL(this.conf, rootDir, logName); 306 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 307 final long timestamp = this.ee.currentTime(); 308 // Write data and flush 309 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 310 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 311 } 312 region.flush(true); 313 314 // Write data with same timestamp and do not flush 315 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 316 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2)); 317 } 318 // Now close the region without flush 319 region.close(true); 320 wal.shutdown(); 321 // split the log 322 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 323 324 // reopen the region 325 WAL wal2 = createWAL(this.conf, rootDir, logName); 326 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 327 Result result2 = region2.get(new Get(ROW)); 328 assertEquals(td.getColumnFamilies().length, result2.size()); 329 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 330 assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER))); 331 } 332 } 333 334 @Test 335 public void testRecoverSequenceId() throws Exception { 336 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 337 TableDescriptor td = pair.getFirst(); 338 RegionInfo ri = pair.getSecond(); 339 340 WAL wal = createWAL(this.conf, rootDir, logName); 341 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 342 Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>(); 343 // Write data and do not flush 344 for (int i = 0; i < countPerFamily; i++) { 345 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 346 region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1)); 347 Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 348 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 349 ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result); 350 assertEquals(1, cells.length); 351 seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(), 352 cells[0].getSequenceId()); 353 } 354 } 355 356 // Now close the region without flush 357 region.close(true); 358 wal.shutdown(); 359 // split the log 360 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 361 362 // reopen the region 363 WAL wal2 = createWAL(this.conf, rootDir, logName); 364 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 365 // assert the seqid was recovered 366 for (int i = 0; i < countPerFamily; i++) { 367 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 368 Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 369 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 370 ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result); 371 assertEquals(1, cells.length); 372 assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), cells[0].getSequenceId()); 373 } 374 } 375 } 376 377 /** 378 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 379 * seqids. 380 */ 381 @Test 382 public void testWrittenViaHRegion() 383 throws IOException, SecurityException, IllegalArgumentException, InterruptedException { 384 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 385 TableDescriptor td = pair.getFirst(); 386 RegionInfo ri = pair.getSecond(); 387 388 // Write countPerFamily edits into the three families. Do a flush on one 389 // of the families during the load of edits so its seqid is not same as 390 // others to test we do right thing when different seqids. 391 WAL wal = createWAL(this.conf, rootDir, logName); 392 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 393 long seqid = region.getOpenSeqNum(); 394 boolean first = true; 395 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 396 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 397 if (first) { 398 // If first, so we have at least one family w/ different seqid to rest. 399 region.flush(true); 400 first = false; 401 } 402 } 403 // Now assert edits made it in. 404 final Get g = new Get(ROW); 405 Result result = region.get(g); 406 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 407 // Now close the region (without flush), split the log, reopen the region and assert that 408 // replay of log has the correct effect, that our seqids are calculated correctly so 409 // all edits in logs are seen as 'stale'/old. 410 region.close(true); 411 wal.shutdown(); 412 try { 413 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 414 } catch (Exception e) { 415 LOG.debug("Got exception", e); 416 } 417 418 WAL wal2 = createWAL(this.conf, rootDir, logName); 419 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 420 long seqid2 = region2.getOpenSeqNum(); 421 assertTrue(seqid + result.size() < seqid2); 422 final Result result1b = region2.get(g); 423 assertEquals(result.size(), result1b.size()); 424 425 // Next test. Add more edits, then 'crash' this region by stealing its wal 426 // out from under it and assert that replay of the log adds the edits back 427 // correctly when region is opened again. 428 for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) { 429 addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y"); 430 } 431 // Get count of edits. 432 final Result result2 = region2.get(g); 433 assertEquals(2 * result.size(), result2.size()); 434 wal2.sync(); 435 final Configuration newConf = HBaseConfiguration.create(this.conf); 436 User user = HBaseTestingUtil.getDifferentUser(newConf, td.getTableName().getNameAsString()); 437 user.runAs(new PrivilegedExceptionAction<Object>() { 438 @Override 439 public Object run() throws Exception { 440 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals); 441 FileSystem newFS = FileSystem.get(newConf); 442 // Make a new wal for new region open. 443 WAL wal3 = createWAL(newConf, rootDir, logName); 444 Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); 445 HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null); 446 long seqid3 = region3.initialize(); 447 Result result3 = region3.get(g); 448 // Assert that count of cells is same as before crash. 449 assertEquals(result2.size(), result3.size()); 450 451 // I can't close wal1. Its been appropriated when we split. 452 region3.close(); 453 wal3.close(); 454 return null; 455 } 456 }); 457 } 458 459 /** 460 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 461 * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level. 462 * The way we get around this is by flushing at the region level, and then deleting the recently 463 * flushed store file for one of the Stores. This would put us back in the situation where all but 464 * that store got flushed and the region died. We restart Region again, and verify that the edits 465 * were replayed. 466 */ 467 @Test 468 public void testAfterPartialFlush() 469 throws IOException, SecurityException, IllegalArgumentException { 470 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 471 TableDescriptor td = pair.getFirst(); 472 RegionInfo ri = pair.getSecond(); 473 474 // Write countPerFamily edits into the three families. Do a flush on one 475 // of the families during the load of edits so its seqid is not same as 476 // others to test we do right thing when different seqids. 477 WAL wal = createWAL(this.conf, rootDir, logName); 478 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 479 long seqid = region.getOpenSeqNum(); 480 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 481 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 482 } 483 484 // Now assert edits made it in. 485 final Get g = new Get(ROW); 486 Result result = region.get(g); 487 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 488 489 // Let us flush the region 490 region.flush(true); 491 region.close(true); 492 wal.shutdown(); 493 494 // delete the store files in the second column family to simulate a failure 495 // in between the flushcache(); 496 // we have 3 families. killing the middle one ensures that taking the maximum 497 // will make us fail. 498 int cf_count = 0; 499 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 500 cf_count++; 501 if (cf_count == 2) { 502 region.getRegionFileSystem().deleteFamily(cfd.getNameAsString()); 503 } 504 } 505 506 // Let us try to split and recover 507 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 508 WAL wal2 = createWAL(this.conf, rootDir, logName); 509 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 510 long seqid2 = region2.getOpenSeqNum(); 511 assertTrue(seqid + result.size() < seqid2); 512 513 final Result result1b = region2.get(g); 514 assertEquals(result.size(), result1b.size()); 515 } 516 517 /** 518 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 519 * flush after writing some data, then writing more data and flush again, at last verify the data. 520 */ 521 @Test 522 public void testAfterAbortingFlush() throws IOException { 523 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 524 TableDescriptor td = pair.getFirst(); 525 RegionInfo ri = pair.getSecond(); 526 527 // Write countPerFamily edits into the three families. Do a flush on one 528 // of the families during the load of edits so its seqid is not same as 529 // others to test we do right thing when different seqids. 530 WAL wal = createWAL(this.conf, rootDir, logName); 531 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 532 Mockito.doReturn(false).when(rsServices).isAborted(); 533 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 534 when(rsServices.getConfiguration()).thenReturn(conf); 535 Configuration customConf = new Configuration(this.conf); 536 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 537 AbstractTestWALReplay.CustomStoreFlusher.class.getName()); 538 HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null); 539 int writtenRowCount = 10; 540 List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies()); 541 for (int i = 0; i < writtenRowCount; i++) { 542 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 543 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 544 Bytes.toBytes("val")); 545 region.put(put); 546 } 547 548 // Now assert edits made it in. 549 RegionScanner scanner = region.getScanner(new Scan()); 550 assertEquals(writtenRowCount, getScannedCount(scanner)); 551 552 // Let us flush the region 553 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 554 try { 555 region.flush(true); 556 fail("Injected exception hasn't been thrown"); 557 } catch (IOException e) { 558 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 559 // simulated to abort server 560 Mockito.doReturn(true).when(rsServices).isAborted(); 561 region.setClosing(false); // region normally does not accept writes after 562 // DroppedSnapshotException. We mock around it for this test. 563 } 564 // writing more data 565 int moreRow = 10; 566 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 567 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 568 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 569 Bytes.toBytes("val")); 570 region.put(put); 571 } 572 writtenRowCount += moreRow; 573 // call flush again 574 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 575 try { 576 region.flush(true); 577 } catch (IOException t) { 578 LOG.info( 579 "Expected exception when flushing region because server is stopped," + t.getMessage()); 580 } 581 582 region.close(true); 583 wal.shutdown(); 584 585 // Let us try to split and recover 586 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 587 WAL wal2 = createWAL(this.conf, rootDir, logName); 588 Mockito.doReturn(false).when(rsServices).isAborted(); 589 HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null); 590 scanner = region2.getScanner(new Scan()); 591 assertEquals(writtenRowCount, getScannedCount(scanner)); 592 } 593 594 private int getScannedCount(RegionScanner scanner) throws IOException { 595 int scannedCount = 0; 596 List<Cell> results = new ArrayList<>(); 597 while (true) { 598 boolean existMore = scanner.next(results); 599 if (!results.isEmpty()) { 600 scannedCount++; 601 } 602 if (!existMore) { 603 break; 604 } 605 results.clear(); 606 } 607 return scannedCount; 608 } 609 610 private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception { 611 // Read the recovered hfile 612 int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen(); 613 FSDataInputStream in = fs.open(recoveredHFile); 614 byte[] fileContent = new byte[fileSize]; 615 in.readFully(0, fileContent, 0, fileSize); 616 in.close(); 617 618 // Write a corrupt hfile by append garbage 619 Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt"); 620 FSDataOutputStream out; 621 out = fs.create(path); 622 out.write(fileContent); 623 out.write(Bytes.toBytes("-----")); 624 out.close(); 625 } 626}