001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; 021import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.junit.jupiter.api.Assertions.fail; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.security.PrivilegedExceptionAction; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataInputStream; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.ExtendedCell; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.ClientInternalHelper; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Get; 052import org.apache.hadoop.hbase.client.Put; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.RegionInfoBuilder; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.Scan; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; 060import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 061import org.apache.hadoop.hbase.regionserver.HRegion; 062import org.apache.hadoop.hbase.regionserver.RegionScanner; 063import org.apache.hadoop.hbase.regionserver.RegionServerServices; 064import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay; 065import org.apache.hadoop.hbase.regionserver.wal.FSHLog; 066import org.apache.hadoop.hbase.security.User; 067import org.apache.hadoop.hbase.testclassification.MediumTests; 068import org.apache.hadoop.hbase.testclassification.RegionServerTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.CommonFSUtils; 071import org.apache.hadoop.hbase.util.EnvironmentEdge; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.FSTableDescriptors; 074import org.apache.hadoop.hbase.util.Pair; 075import org.junit.jupiter.api.AfterAll; 076import org.junit.jupiter.api.AfterEach; 077import org.junit.jupiter.api.BeforeAll; 078import org.junit.jupiter.api.BeforeEach; 079import org.junit.jupiter.api.Tag; 080import org.junit.jupiter.api.Test; 081import org.junit.jupiter.api.TestInfo; 082import org.mockito.Mockito; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086@Tag(RegionServerTests.TAG) 087@Tag(MediumTests.TAG) 088public class TestWALSplitToHFile { 089 090 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 091 static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 092 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 093 private Path rootDir = null; 094 private String logName; 095 private Path oldLogDir; 096 private Path logDir; 097 private FileSystem fs; 098 private Configuration conf; 099 private WALFactory wals; 100 101 private static final byte[] ROW = Bytes.toBytes("row"); 102 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 103 private static final byte[] VALUE1 = Bytes.toBytes("value1"); 104 private static final byte[] VALUE2 = Bytes.toBytes("value2"); 105 private static final int countPerFamily = 10; 106 107 private String testMethodName; 108 109 @BeforeAll 110 public static void setUpBeforeClass() throws Exception { 111 Configuration conf = UTIL.getConfiguration(); 112 conf.setBoolean(WAL_SPLIT_TO_HFILE, true); 113 UTIL.startMiniCluster(3); 114 Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 115 LOG.info("hbase.rootdir=" + hbaseRootDir); 116 CommonFSUtils.setRootDir(conf, hbaseRootDir); 117 } 118 119 @AfterAll 120 public static void tearDownAfterClass() throws Exception { 121 UTIL.shutdownMiniCluster(); 122 } 123 124 @BeforeEach 125 public void setUp(TestInfo testInfo) throws Exception { 126 testMethodName = testInfo.getTestMethod().get().getName(); 127 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 128 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false); 129 this.fs = UTIL.getDFSCluster().getFileSystem(); 130 this.rootDir = CommonFSUtils.getRootDir(this.conf); 131 this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); 132 String serverName = ServerName 133 .valueOf(testMethodName + "-manual", 16010, EnvironmentEdgeManager.currentTime()).toString(); 134 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 135 this.logDir = new Path(this.rootDir, logName); 136 if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) { 137 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 138 } 139 this.wals = new WALFactory(conf, testMethodName); 140 } 141 142 @AfterEach 143 public void tearDown() throws Exception { 144 this.wals.close(); 145 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true); 146 } 147 148 /* 149 * @param p Directory to cleanup 150 */ 151 private void deleteDir(final Path p) throws IOException { 152 if (this.fs.exists(p)) { 153 if (!this.fs.delete(p, true)) { 154 throw new IOException("Failed remove of " + p); 155 } 156 } 157 } 158 159 private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException { 160 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 161 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build()); 162 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build()); 163 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build()); 164 TableDescriptor td = builder.build(); 165 UTIL.getAdmin().createTable(td); 166 return td; 167 } 168 169 private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { 170 FileSystem fs = hbaseRootDir.getFileSystem(c); 171 fs.mkdirs(new Path(hbaseRootDir, logName)); 172 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, c); 173 wal.init(); 174 return wal; 175 } 176 177 private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException { 178 fs.mkdirs(new Path(hbaseRootDir, logName)); 179 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf); 180 wal.init(); 181 return wal; 182 } 183 184 private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException { 185 final TableName tableName = TableName.valueOf(testMethodName); 186 final TableDescriptor td = createBasic3FamilyTD(tableName); 187 final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); 188 final Path tableDir = CommonFSUtils.getTableDir(this.rootDir, tableName); 189 deleteDir(tableDir); 190 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); 191 HRegion region = HBaseTestingUtil.createRegionAndWAL(ri, rootDir, this.conf, td); 192 HBaseTestingUtil.closeRegionAndWAL(region); 193 return new Pair<>(td, ri); 194 } 195 196 private void writeData(TableDescriptor td, HRegion region) throws IOException { 197 final long timestamp = this.ee.currentTime(); 198 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 199 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 200 } 201 } 202 203 @Test 204 public void testDifferentRootDirAndWALRootDir() throws Exception { 205 // Change wal root dir and reset the configuration 206 Path walRootDir = UTIL.createWALRootDir(); 207 this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); 208 209 FileSystem walFs = CommonFSUtils.getWALFileSystem(this.conf); 210 this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 211 String serverName = ServerName 212 .valueOf(testMethodName + "-manual", 16010, EnvironmentEdgeManager.currentTime()).toString(); 213 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 214 this.logDir = new Path(walRootDir, logName); 215 this.wals = new WALFactory(conf, testMethodName); 216 217 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 218 TableDescriptor td = pair.getFirst(); 219 RegionInfo ri = pair.getSecond(); 220 221 WAL wal = createWAL(walFs, walRootDir, logName); 222 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 223 writeData(td, region); 224 225 // Now close the region without flush 226 region.close(true); 227 wal.shutdown(); 228 // split the log 229 WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 230 231 WAL wal2 = createWAL(walFs, walRootDir, logName); 232 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 233 Result result2 = region2.get(new Get(ROW)); 234 assertEquals(td.getColumnFamilies().length, result2.size()); 235 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 236 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 237 } 238 } 239 240 @Test 241 public void testCorruptRecoveredHFile() throws Exception { 242 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 243 TableDescriptor td = pair.getFirst(); 244 RegionInfo ri = pair.getSecond(); 245 246 WAL wal = createWAL(this.conf, rootDir, logName); 247 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 248 writeData(td, region); 249 250 // Now close the region without flush 251 region.close(true); 252 wal.shutdown(); 253 // split the log 254 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 255 256 // Write a corrupt recovered hfile 257 Path regionDir = 258 new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName()); 259 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 260 FileStatus[] files = 261 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 262 assertNotNull(files); 263 assertTrue(files.length > 0); 264 writeCorruptRecoveredHFile(files[0].getPath()); 265 } 266 267 // Failed to reopen the region 268 WAL wal2 = createWAL(this.conf, rootDir, logName); 269 try { 270 HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 271 fail("Should fail to open region"); 272 } catch (CorruptHFileException che) { 273 // Expected 274 } 275 276 // Set skip errors to true and reopen the region 277 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true); 278 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 279 Result result2 = region2.get(new Get(ROW)); 280 assertEquals(td.getColumnFamilies().length, result2.size()); 281 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 282 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER))); 283 // Assert the corrupt file was skipped and still exist 284 FileStatus[] files = 285 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); 286 assertNotNull(files); 287 assertEquals(1, files.length); 288 assertTrue(files[0].getPath().getName().contains("corrupt")); 289 } 290 } 291 292 @Test 293 public void testPutWithSameTimestamp() throws Exception { 294 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 295 TableDescriptor td = pair.getFirst(); 296 RegionInfo ri = pair.getSecond(); 297 298 WAL wal = createWAL(this.conf, rootDir, logName); 299 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 300 final long timestamp = this.ee.currentTime(); 301 // Write data and flush 302 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 303 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1)); 304 } 305 region.flush(true); 306 307 // Write data with same timestamp and do not flush 308 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 309 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2)); 310 } 311 // Now close the region without flush 312 region.close(true); 313 wal.shutdown(); 314 // split the log 315 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 316 317 // reopen the region 318 WAL wal2 = createWAL(this.conf, rootDir, logName); 319 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 320 Result result2 = region2.get(new Get(ROW)); 321 assertEquals(td.getColumnFamilies().length, result2.size()); 322 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 323 assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER))); 324 } 325 } 326 327 @Test 328 public void testRecoverSequenceId() throws Exception { 329 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 330 TableDescriptor td = pair.getFirst(); 331 RegionInfo ri = pair.getSecond(); 332 333 WAL wal = createWAL(this.conf, rootDir, logName); 334 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 335 Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>(); 336 // Write data and do not flush 337 for (int i = 0; i < countPerFamily; i++) { 338 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 339 region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1)); 340 Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 341 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 342 ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result); 343 assertEquals(1, cells.length); 344 seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(), 345 cells[0].getSequenceId()); 346 } 347 } 348 349 // Now close the region without flush 350 region.close(true); 351 wal.shutdown(); 352 // split the log 353 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 354 355 // reopen the region 356 WAL wal2 = createWAL(this.conf, rootDir, logName); 357 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 358 // assert the seqid was recovered 359 for (int i = 0; i < countPerFamily; i++) { 360 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 361 Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); 362 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER))); 363 ExtendedCell[] cells = ClientInternalHelper.getExtendedRawCells(result); 364 assertEquals(1, cells.length); 365 assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), cells[0].getSequenceId()); 366 } 367 } 368 } 369 370 /** 371 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 372 * seqids. 373 */ 374 @Test 375 public void testWrittenViaHRegion() 376 throws IOException, SecurityException, IllegalArgumentException, InterruptedException { 377 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 378 TableDescriptor td = pair.getFirst(); 379 RegionInfo ri = pair.getSecond(); 380 381 // Write countPerFamily edits into the three families. Do a flush on one 382 // of the families during the load of edits so its seqid is not same as 383 // others to test we do right thing when different seqids. 384 WAL wal = createWAL(this.conf, rootDir, logName); 385 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 386 long seqid = region.getOpenSeqNum(); 387 boolean first = true; 388 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 389 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 390 if (first) { 391 // If first, so we have at least one family w/ different seqid to rest. 392 region.flush(true); 393 first = false; 394 } 395 } 396 // Now assert edits made it in. 397 final Get g = new Get(ROW); 398 Result result = region.get(g); 399 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 400 // Now close the region (without flush), split the log, reopen the region and assert that 401 // replay of log has the correct effect, that our seqids are calculated correctly so 402 // all edits in logs are seen as 'stale'/old. 403 region.close(true); 404 wal.shutdown(); 405 try { 406 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 407 } catch (Exception e) { 408 LOG.debug("Got exception", e); 409 } 410 411 WAL wal2 = createWAL(this.conf, rootDir, logName); 412 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); 413 long seqid2 = region2.getOpenSeqNum(); 414 assertTrue(seqid + result.size() < seqid2); 415 final Result result1b = region2.get(g); 416 assertEquals(result.size(), result1b.size()); 417 418 // Next test. Add more edits, then 'crash' this region by stealing its wal 419 // out from under it and assert that replay of the log adds the edits back 420 // correctly when region is opened again. 421 for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) { 422 addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y"); 423 } 424 // Get count of edits. 425 final Result result2 = region2.get(g); 426 assertEquals(2 * result.size(), result2.size()); 427 wal2.sync(); 428 final Configuration newConf = HBaseConfiguration.create(this.conf); 429 User user = HBaseTestingUtil.getDifferentUser(newConf, td.getTableName().getNameAsString()); 430 user.runAs(new PrivilegedExceptionAction<Object>() { 431 @Override 432 public Object run() throws Exception { 433 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals); 434 FileSystem newFS = FileSystem.get(newConf); 435 // Make a new wal for new region open. 436 WAL wal3 = createWAL(newConf, rootDir, logName); 437 Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); 438 HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null); 439 long seqid3 = region3.initialize(); 440 Result result3 = region3.get(g); 441 // Assert that count of cells is same as before crash. 442 assertEquals(result2.size(), result3.size()); 443 444 // I can't close wal1. Its been appropriated when we split. 445 region3.close(); 446 wal3.close(); 447 return null; 448 } 449 }); 450 } 451 452 /** 453 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 454 * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level. 455 * The way we get around this is by flushing at the region level, and then deleting the recently 456 * flushed store file for one of the Stores. This would put us back in the situation where all but 457 * that store got flushed and the region died. We restart Region again, and verify that the edits 458 * were replayed. 459 */ 460 @Test 461 public void testAfterPartialFlush() 462 throws IOException, SecurityException, IllegalArgumentException { 463 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 464 TableDescriptor td = pair.getFirst(); 465 RegionInfo ri = pair.getSecond(); 466 467 // Write countPerFamily edits into the three families. Do a flush on one 468 // of the families during the load of edits so its seqid is not same as 469 // others to test we do right thing when different seqids. 470 WAL wal = createWAL(this.conf, rootDir, logName); 471 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); 472 long seqid = region.getOpenSeqNum(); 473 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 474 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); 475 } 476 477 // Now assert edits made it in. 478 final Get g = new Get(ROW); 479 Result result = region.get(g); 480 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); 481 482 // Let us flush the region 483 region.flush(true); 484 region.close(true); 485 wal.shutdown(); 486 487 // delete the store files in the second column family to simulate a failure 488 // in between the flushcache(); 489 // we have 3 families. killing the middle one ensures that taking the maximum 490 // will make us fail. 491 int cf_count = 0; 492 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { 493 cf_count++; 494 if (cf_count == 2) { 495 region.getRegionFileSystem().deleteFamily(cfd.getNameAsString()); 496 } 497 } 498 499 // Let us try to split and recover 500 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 501 WAL wal2 = createWAL(this.conf, rootDir, logName); 502 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); 503 long seqid2 = region2.getOpenSeqNum(); 504 assertTrue(seqid + result.size() < seqid2); 505 506 final Result result1b = region2.get(g); 507 assertEquals(result.size(), result1b.size()); 508 } 509 510 /** 511 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 512 * flush after writing some data, then writing more data and flush again, at last verify the data. 513 */ 514 @Test 515 public void testAfterAbortingFlush() throws IOException { 516 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion(); 517 TableDescriptor td = pair.getFirst(); 518 RegionInfo ri = pair.getSecond(); 519 520 // Write countPerFamily edits into the three families. Do a flush on one 521 // of the families during the load of edits so its seqid is not same as 522 // others to test we do right thing when different seqids. 523 WAL wal = createWAL(this.conf, rootDir, logName); 524 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 525 Mockito.doReturn(false).when(rsServices).isAborted(); 526 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 527 when(rsServices.getConfiguration()).thenReturn(conf); 528 Configuration customConf = new Configuration(this.conf); 529 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 530 AbstractTestWALReplay.CustomStoreFlusher.class.getName()); 531 HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null); 532 int writtenRowCount = 10; 533 List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies()); 534 for (int i = 0; i < writtenRowCount; i++) { 535 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 536 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 537 Bytes.toBytes("val")); 538 region.put(put); 539 } 540 541 // Now assert edits made it in. 542 RegionScanner scanner = region.getScanner(new Scan()); 543 assertEquals(writtenRowCount, getScannedCount(scanner)); 544 545 // Let us flush the region 546 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 547 try { 548 region.flush(true); 549 fail("Injected exception hasn't been thrown"); 550 } catch (IOException e) { 551 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 552 // simulated to abort server 553 Mockito.doReturn(true).when(rsServices).isAborted(); 554 region.setClosing(false); // region normally does not accept writes after 555 // DroppedSnapshotException. We mock around it for this test. 556 } 557 // writing more data 558 int moreRow = 10; 559 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 560 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); 561 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 562 Bytes.toBytes("val")); 563 region.put(put); 564 } 565 writtenRowCount += moreRow; 566 // call flush again 567 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 568 try { 569 region.flush(true); 570 } catch (IOException t) { 571 LOG.info( 572 "Expected exception when flushing region because server is stopped," + t.getMessage()); 573 } 574 575 region.close(true); 576 wal.shutdown(); 577 578 // Let us try to split and recover 579 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); 580 WAL wal2 = createWAL(this.conf, rootDir, logName); 581 Mockito.doReturn(false).when(rsServices).isAborted(); 582 HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null); 583 scanner = region2.getScanner(new Scan()); 584 assertEquals(writtenRowCount, getScannedCount(scanner)); 585 } 586 587 private int getScannedCount(RegionScanner scanner) throws IOException { 588 int scannedCount = 0; 589 List<Cell> results = new ArrayList<>(); 590 while (true) { 591 boolean existMore = scanner.next(results); 592 if (!results.isEmpty()) { 593 scannedCount++; 594 } 595 if (!existMore) { 596 break; 597 } 598 results.clear(); 599 } 600 return scannedCount; 601 } 602 603 private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception { 604 // Read the recovered hfile 605 int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen(); 606 FSDataInputStream in = fs.open(recoveredHFile); 607 byte[] fileContent = new byte[fileSize]; 608 in.readFully(0, fileContent, 0, fileSize); 609 in.close(); 610 611 // Write a corrupt hfile by append garbage 612 Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt"); 613 FSDataOutputStream out; 614 out = fs.create(path); 615 out.write(fileContent); 616 out.write(Bytes.toBytes("-----")); 617 out.close(); 618 } 619}