001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.wal; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.ArgumentMatchers.anyInt; 027import static org.mockito.ArgumentMatchers.eq; 028import static org.mockito.Mockito.doAnswer; 029import static org.mockito.Mockito.spy; 030import static org.mockito.Mockito.when; 031 032import java.io.FilterInputStream; 033import java.io.IOException; 034import java.lang.reflect.Field; 035import java.security.PrivilegedExceptionAction; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Collection; 039import java.util.HashSet; 040import java.util.List; 041import java.util.NavigableMap; 042import java.util.Set; 043import java.util.TreeMap; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicInteger; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HColumnDescriptor; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionInfo; 058import org.apache.hadoop.hbase.HTableDescriptor; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.MiniHBaseCluster; 061import org.apache.hadoop.hbase.ServerName; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.ResultScanner; 068import org.apache.hadoop.hbase.client.Scan; 069import org.apache.hadoop.hbase.client.Table; 070import org.apache.hadoop.hbase.monitoring.MonitoredTask; 071import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 072import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 073import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 074import org.apache.hadoop.hbase.regionserver.FlushRequestListener; 075import org.apache.hadoop.hbase.regionserver.FlushRequester; 076import org.apache.hadoop.hbase.regionserver.HRegion; 077import org.apache.hadoop.hbase.regionserver.HRegionServer; 078import org.apache.hadoop.hbase.regionserver.HStore; 079import org.apache.hadoop.hbase.regionserver.MemStoreSizing; 080import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 081import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 082import org.apache.hadoop.hbase.regionserver.Region; 083import org.apache.hadoop.hbase.regionserver.RegionScanner; 084import org.apache.hadoop.hbase.regionserver.RegionServerServices; 085import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 086import org.apache.hadoop.hbase.security.User; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 089import org.apache.hadoop.hbase.util.EnvironmentEdge; 090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 091import org.apache.hadoop.hbase.util.FSUtils; 092import org.apache.hadoop.hbase.util.HFileTestUtil; 093import org.apache.hadoop.hbase.util.Pair; 094import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 095import org.apache.hadoop.hbase.wal.WAL; 096import org.apache.hadoop.hbase.wal.WALEdit; 097import org.apache.hadoop.hbase.wal.WALFactory; 098import org.apache.hadoop.hbase.wal.WALKeyImpl; 099import org.apache.hadoop.hbase.wal.WALSplitter; 100import org.apache.hadoop.hdfs.DFSInputStream; 101import org.junit.After; 102import org.junit.AfterClass; 103import org.junit.Before; 104import org.junit.BeforeClass; 105import org.junit.Rule; 106import org.junit.Test; 107import org.junit.rules.TestName; 108import org.mockito.Mockito; 109import org.mockito.invocation.InvocationOnMock; 110import org.mockito.stubbing.Answer; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113 114/** 115 * Test replay of edits out of a WAL split. 116 */ 117public abstract class AbstractTestWALReplay { 118 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 119 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 120 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 121 private Path hbaseRootDir = null; 122 private String logName; 123 private Path oldLogDir; 124 private Path logDir; 125 private FileSystem fs; 126 private Configuration conf; 127 private WALFactory wals; 128 129 @Rule 130 public final TestName currentTest = new TestName(); 131 132 133 @BeforeClass 134 public static void setUpBeforeClass() throws Exception { 135 Configuration conf = TEST_UTIL.getConfiguration(); 136 // The below config supported by 0.20-append and CDH3b2 137 conf.setInt("dfs.client.block.recovery.retries", 2); 138 TEST_UTIL.startMiniCluster(3); 139 Path hbaseRootDir = 140 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 141 LOG.info("hbase.rootdir=" + hbaseRootDir); 142 FSUtils.setRootDir(conf, hbaseRootDir); 143 } 144 145 @AfterClass 146 public static void tearDownAfterClass() throws Exception { 147 TEST_UTIL.shutdownMiniCluster(); 148 } 149 150 @Before 151 public void setUp() throws Exception { 152 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 153 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 154 this.hbaseRootDir = FSUtils.getRootDir(this.conf); 155 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 156 String serverName = 157 ServerName.valueOf(currentTest.getMethodName() + "-manual", 16010, System.currentTimeMillis()) 158 .toString(); 159 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 160 this.logDir = new Path(this.hbaseRootDir, logName); 161 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 162 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 163 } 164 this.wals = new WALFactory(conf, currentTest.getMethodName()); 165 } 166 167 @After 168 public void tearDown() throws Exception { 169 this.wals.close(); 170 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 171 } 172 173 /* 174 * @param p Directory to cleanup 175 */ 176 private void deleteDir(final Path p) throws IOException { 177 if (this.fs.exists(p)) { 178 if (!this.fs.delete(p, true)) { 179 throw new IOException("Failed remove of " + p); 180 } 181 } 182 } 183 184 /** 185 * 186 * @throws Exception 187 */ 188 @Test 189 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { 190 final TableName tableName = 191 TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); 192 byte[] family1 = Bytes.toBytes("cf1"); 193 byte[] family2 = Bytes.toBytes("cf2"); 194 byte[] qualifier = Bytes.toBytes("q"); 195 byte[] value = Bytes.toBytes("testV"); 196 byte[][] familys = { family1, family2 }; 197 TEST_UTIL.createTable(tableName, familys); 198 Table htable = TEST_UTIL.getConnection().getTable(tableName); 199 Put put = new Put(Bytes.toBytes("r1")); 200 put.addColumn(family1, qualifier, value); 201 htable.put(put); 202 ResultScanner resultScanner = htable.getScanner(new Scan()); 203 int count = 0; 204 while (resultScanner.next() != null) { 205 count++; 206 } 207 resultScanner.close(); 208 assertEquals(1, count); 209 210 MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); 211 List<HRegion> regions = hbaseCluster.getRegions(tableName); 212 assertEquals(1, regions.size()); 213 214 // move region to another regionserver 215 Region destRegion = regions.get(0); 216 int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName()); 217 assertTrue("Please start more than 1 regionserver", 218 hbaseCluster.getRegionServerThreads().size() > 1); 219 int destServerNum = 0; 220 while (destServerNum == originServerNum) { 221 destServerNum++; 222 } 223 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); 224 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); 225 // move region to destination regionserver 226 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName()); 227 228 // delete the row 229 Delete del = new Delete(Bytes.toBytes("r1")); 230 htable.delete(del); 231 resultScanner = htable.getScanner(new Scan()); 232 count = 0; 233 while (resultScanner.next() != null) { 234 count++; 235 } 236 resultScanner.close(); 237 assertEquals(0, count); 238 239 // flush region and make major compaction 240 HRegion region = 241 (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); 242 region.flush(true); 243 // wait to complete major compaction 244 for (HStore store : region.getStores()) { 245 store.triggerMajorCompaction(); 246 } 247 region.compact(true); 248 249 // move region to origin regionserver 250 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName()); 251 // abort the origin regionserver 252 originServer.abort("testing"); 253 254 // see what we get 255 Result result = htable.get(new Get(Bytes.toBytes("r1"))); 256 if (result != null) { 257 assertTrue("Row is deleted, but we get" + result.toString(), 258 (result == null) || result.isEmpty()); 259 } 260 resultScanner.close(); 261 } 262 263 /** 264 * Tests for hbase-2727. 265 * @throws Exception 266 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> 267 */ 268 @Test 269 public void test2727() throws Exception { 270 // Test being able to have > 1 set of edits in the recovered.edits directory. 271 // Ensure edits are replayed properly. 272 final TableName tableName = 273 TableName.valueOf("test2727"); 274 275 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 276 HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 277 Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); 278 deleteDir(basedir); 279 280 HTableDescriptor htd = createBasic3FamilyHTD(tableName); 281 Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 282 HBaseTestingUtility.closeRegionAndWAL(region2); 283 final byte [] rowName = tableName.getName(); 284 285 WAL wal1 = createWAL(this.conf, hbaseRootDir, logName); 286 // Add 1k to each family. 287 final int countPerFamily = 1000; 288 289 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 290 for(byte[] fam : htd.getFamiliesKeys()) { 291 scopes.put(fam, 0); 292 } 293 for (HColumnDescriptor hcd: htd.getFamilies()) { 294 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, 295 wal1, htd, mvcc, scopes); 296 } 297 wal1.shutdown(); 298 runWALSplit(this.conf); 299 300 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 301 // Add 1k to each family. 302 for (HColumnDescriptor hcd: htd.getFamilies()) { 303 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, 304 ee, wal2, htd, mvcc, scopes); 305 } 306 wal2.shutdown(); 307 runWALSplit(this.conf); 308 309 WAL wal3 = createWAL(this.conf, hbaseRootDir, logName); 310 try { 311 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); 312 long seqid = region.getOpenSeqNum(); 313 // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. 314 // When opened, this region would apply 6k edits, and increment the sequenceId by 1 315 assertTrue(seqid > mvcc.getWritePoint()); 316 assertEquals(seqid - 1, mvcc.getWritePoint()); 317 LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " 318 + mvcc.getReadPoint()); 319 320 // TODO: Scan all. 321 region.close(); 322 } finally { 323 wal3.close(); 324 } 325 } 326 327 /** 328 * Test case of HRegion that is only made out of bulk loaded files. Assert 329 * that we don't 'crash'. 330 * @throws IOException 331 * @throws IllegalAccessException 332 * @throws NoSuchFieldException 333 * @throws IllegalArgumentException 334 * @throws SecurityException 335 */ 336 @Test 337 public void testRegionMadeOfBulkLoadedFilesOnly() 338 throws IOException, SecurityException, IllegalArgumentException, 339 NoSuchFieldException, IllegalAccessException, InterruptedException { 340 final TableName tableName = 341 TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); 342 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 343 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 344 deleteDir(basedir); 345 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 346 Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 347 HBaseTestingUtility.closeRegionAndWAL(region2); 348 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 349 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 350 351 byte [] family = htd.getFamilies().iterator().next().getName(); 352 Path f = new Path(basedir, "hfile"); 353 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), 354 Bytes.toBytes("z"), 10); 355 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 356 hfs.add(Pair.newPair(family, f.toString())); 357 region.bulkLoadHFiles(hfs, true, null); 358 359 // Add an edit so something in the WAL 360 byte[] row = tableName.getName(); 361 region.put((new Put(row)).addColumn(family, family, family)); 362 wal.sync(); 363 final int rowsInsertedCount = 11; 364 365 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 366 367 // Now 'crash' the region by stealing its wal 368 final Configuration newConf = HBaseConfiguration.create(this.conf); 369 User user = HBaseTestingUtility.getDifferentUser(newConf, 370 tableName.getNameAsString()); 371 user.runAs(new PrivilegedExceptionAction() { 372 @Override 373 public Object run() throws Exception { 374 runWALSplit(newConf); 375 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 376 377 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), 378 hbaseRootDir, hri, htd, wal2); 379 long seqid2 = region2.getOpenSeqNum(); 380 assertTrue(seqid2 > -1); 381 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 382 383 // I can't close wal1. Its been appropriated when we split. 384 region2.close(); 385 wal2.close(); 386 return null; 387 } 388 }); 389 } 390 391 /** 392 * HRegion test case that is made of a major compacted HFile (created with three bulk loaded 393 * files) and an edit in the memstore. 394 * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries 395 * from being replayed" 396 * @throws IOException 397 * @throws IllegalAccessException 398 * @throws NoSuchFieldException 399 * @throws IllegalArgumentException 400 * @throws SecurityException 401 */ 402 @Test 403 public void testCompactedBulkLoadedFiles() 404 throws IOException, SecurityException, IllegalArgumentException, 405 NoSuchFieldException, IllegalAccessException, InterruptedException { 406 final TableName tableName = 407 TableName.valueOf("testCompactedBulkLoadedFiles"); 408 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 409 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 410 deleteDir(basedir); 411 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 412 HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 413 HBaseTestingUtility.closeRegionAndWAL(region2); 414 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 415 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 416 417 // Add an edit so something in the WAL 418 byte [] row = tableName.getName(); 419 byte [] family = htd.getFamilies().iterator().next().getName(); 420 region.put((new Put(row)).addColumn(family, family, family)); 421 wal.sync(); 422 423 List <Pair<byte[],String>> hfs= new ArrayList<>(1); 424 for (int i = 0; i < 3; i++) { 425 Path f = new Path(basedir, "hfile"+i); 426 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), 427 Bytes.toBytes(i + "50"), 10); 428 hfs.add(Pair.newPair(family, f.toString())); 429 } 430 region.bulkLoadHFiles(hfs, true, null); 431 final int rowsInsertedCount = 31; 432 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 433 434 // major compact to turn all the bulk loaded files into one normal file 435 region.compact(true); 436 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 437 438 // Now 'crash' the region by stealing its wal 439 final Configuration newConf = HBaseConfiguration.create(this.conf); 440 User user = HBaseTestingUtility.getDifferentUser(newConf, 441 tableName.getNameAsString()); 442 user.runAs(new PrivilegedExceptionAction() { 443 @Override 444 public Object run() throws Exception { 445 runWALSplit(newConf); 446 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 447 448 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), 449 hbaseRootDir, hri, htd, wal2); 450 long seqid2 = region2.getOpenSeqNum(); 451 assertTrue(seqid2 > -1); 452 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 453 454 // I can't close wal1. Its been appropriated when we split. 455 region2.close(); 456 wal2.close(); 457 return null; 458 } 459 }); 460 } 461 462 463 /** 464 * Test writing edits into an HRegion, closing it, splitting logs, opening 465 * Region again. Verify seqids. 466 * @throws IOException 467 * @throws IllegalAccessException 468 * @throws NoSuchFieldException 469 * @throws IllegalArgumentException 470 * @throws SecurityException 471 */ 472 @Test 473 public void testReplayEditsWrittenViaHRegion() 474 throws IOException, SecurityException, IllegalArgumentException, 475 NoSuchFieldException, IllegalAccessException, InterruptedException { 476 final TableName tableName = 477 TableName.valueOf("testReplayEditsWrittenViaHRegion"); 478 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 479 final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); 480 deleteDir(basedir); 481 final byte[] rowName = tableName.getName(); 482 final int countPerFamily = 10; 483 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 484 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 485 HBaseTestingUtility.closeRegionAndWAL(region3); 486 // Write countPerFamily edits into the three families. Do a flush on one 487 // of the families during the load of edits so its seqid is not same as 488 // others to test we do right thing when different seqids. 489 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 490 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 491 long seqid = region.getOpenSeqNum(); 492 boolean first = true; 493 for (HColumnDescriptor hcd: htd.getFamilies()) { 494 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 495 if (first) { 496 // If first, so we have at least one family w/ different seqid to rest. 497 region.flush(true); 498 first = false; 499 } 500 } 501 // Now assert edits made it in. 502 final Get g = new Get(rowName); 503 Result result = region.get(g); 504 assertEquals(countPerFamily * htd.getFamilies().size(), 505 result.size()); 506 // Now close the region (without flush), split the log, reopen the region and assert that 507 // replay of log has the correct effect, that our seqids are calculated correctly so 508 // all edits in logs are seen as 'stale'/old. 509 region.close(true); 510 wal.shutdown(); 511 runWALSplit(this.conf); 512 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 513 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); 514 long seqid2 = region2.getOpenSeqNum(); 515 assertTrue(seqid + result.size() < seqid2); 516 final Result result1b = region2.get(g); 517 assertEquals(result.size(), result1b.size()); 518 519 // Next test. Add more edits, then 'crash' this region by stealing its wal 520 // out from under it and assert that replay of the log adds the edits back 521 // correctly when region is opened again. 522 for (HColumnDescriptor hcd: htd.getFamilies()) { 523 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); 524 } 525 // Get count of edits. 526 final Result result2 = region2.get(g); 527 assertEquals(2 * result.size(), result2.size()); 528 wal2.sync(); 529 final Configuration newConf = HBaseConfiguration.create(this.conf); 530 User user = HBaseTestingUtility.getDifferentUser(newConf, 531 tableName.getNameAsString()); 532 user.runAs(new PrivilegedExceptionAction<Object>() { 533 @Override 534 public Object run() throws Exception { 535 runWALSplit(newConf); 536 FileSystem newFS = FileSystem.get(newConf); 537 // Make a new wal for new region open. 538 WAL wal3 = createWAL(newConf, hbaseRootDir, logName); 539 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); 540 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { 541 @Override 542 protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) { 543 super.restoreEdit(s, cell, memstoreSizing); 544 countOfRestoredEdits.incrementAndGet(); 545 } 546 }; 547 long seqid3 = region3.initialize(); 548 Result result3 = region3.get(g); 549 // Assert that count of cells is same as before crash. 550 assertEquals(result2.size(), result3.size()); 551 assertEquals(htd.getFamilies().size() * countPerFamily, 552 countOfRestoredEdits.get()); 553 554 // I can't close wal1. Its been appropriated when we split. 555 region3.close(); 556 wal3.close(); 557 return null; 558 } 559 }); 560 } 561 562 /** 563 * Test that we recover correctly when there is a failure in between the 564 * flushes. i.e. Some stores got flushed but others did not. 565 * 566 * Unfortunately, there is no easy hook to flush at a store level. The way 567 * we get around this is by flushing at the region level, and then deleting 568 * the recently flushed store file for one of the Stores. This would put us 569 * back in the situation where all but that store got flushed and the region 570 * died. 571 * 572 * We restart Region again, and verify that the edits were replayed. 573 * 574 * @throws IOException 575 * @throws IllegalAccessException 576 * @throws NoSuchFieldException 577 * @throws IllegalArgumentException 578 * @throws SecurityException 579 */ 580 @Test 581 public void testReplayEditsAfterPartialFlush() 582 throws IOException, SecurityException, IllegalArgumentException, 583 NoSuchFieldException, IllegalAccessException, InterruptedException { 584 final TableName tableName = 585 TableName.valueOf("testReplayEditsWrittenViaHRegion"); 586 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 587 final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); 588 deleteDir(basedir); 589 final byte[] rowName = tableName.getName(); 590 final int countPerFamily = 10; 591 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 592 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 593 HBaseTestingUtility.closeRegionAndWAL(region3); 594 // Write countPerFamily edits into the three families. Do a flush on one 595 // of the families during the load of edits so its seqid is not same as 596 // others to test we do right thing when different seqids. 597 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 598 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 599 long seqid = region.getOpenSeqNum(); 600 for (HColumnDescriptor hcd: htd.getFamilies()) { 601 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 602 } 603 604 // Now assert edits made it in. 605 final Get g = new Get(rowName); 606 Result result = region.get(g); 607 assertEquals(countPerFamily * htd.getFamilies().size(), 608 result.size()); 609 610 // Let us flush the region 611 region.flush(true); 612 region.close(true); 613 wal.shutdown(); 614 615 // delete the store files in the second column family to simulate a failure 616 // in between the flushcache(); 617 // we have 3 families. killing the middle one ensures that taking the maximum 618 // will make us fail. 619 int cf_count = 0; 620 for (HColumnDescriptor hcd: htd.getFamilies()) { 621 cf_count++; 622 if (cf_count == 2) { 623 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); 624 } 625 } 626 627 628 // Let us try to split and recover 629 runWALSplit(this.conf); 630 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 631 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); 632 long seqid2 = region2.getOpenSeqNum(); 633 assertTrue(seqid + result.size() < seqid2); 634 635 final Result result1b = region2.get(g); 636 assertEquals(result.size(), result1b.size()); 637 } 638 639 640 // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. 641 // Only throws exception if throwExceptionWhenFlushing is set true. 642 public static class CustomStoreFlusher extends DefaultStoreFlusher { 643 // Switch between throw and not throw exception in flush 644 static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); 645 646 public CustomStoreFlusher(Configuration conf, HStore store) { 647 super(conf, store); 648 } 649 650 @Override 651 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 652 MonitoredTask status, ThroughputController throughputController, 653 FlushLifeCycleTracker tracker) throws IOException { 654 if (throwExceptionWhenFlushing.get()) { 655 throw new IOException("Simulated exception by tests"); 656 } 657 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); 658 } 659 }; 660 661 /** 662 * Test that we could recover the data correctly after aborting flush. In the 663 * test, first we abort flush after writing some data, then writing more data 664 * and flush again, at last verify the data. 665 * @throws IOException 666 */ 667 @Test 668 public void testReplayEditsAfterAbortingFlush() throws IOException { 669 final TableName tableName = 670 TableName.valueOf("testReplayEditsAfterAbortingFlush"); 671 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 672 final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName); 673 deleteDir(basedir); 674 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 675 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 676 HBaseTestingUtility.closeRegionAndWAL(region3); 677 // Write countPerFamily edits into the three families. Do a flush on one 678 // of the families during the load of edits so its seqid is not same as 679 // others to test we do right thing when different seqids. 680 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 681 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 682 Mockito.doReturn(false).when(rsServices).isAborted(); 683 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 684 when(rsServices.getConfiguration()).thenReturn(conf); 685 Configuration customConf = new Configuration(this.conf); 686 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 687 CustomStoreFlusher.class.getName()); 688 HRegion region = 689 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); 690 int writtenRowCount = 10; 691 List<HColumnDescriptor> families = new ArrayList<>(htd.getFamilies()); 692 for (int i = 0; i < writtenRowCount; i++) { 693 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 694 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 695 Bytes.toBytes("val")); 696 region.put(put); 697 } 698 699 // Now assert edits made it in. 700 RegionScanner scanner = region.getScanner(new Scan()); 701 assertEquals(writtenRowCount, getScannedCount(scanner)); 702 703 // Let us flush the region 704 CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 705 try { 706 region.flush(true); 707 fail("Injected exception hasn't been thrown"); 708 } catch (IOException e) { 709 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 710 // simulated to abort server 711 Mockito.doReturn(true).when(rsServices).isAborted(); 712 region.setClosing(false); // region normally does not accept writes after 713 // DroppedSnapshotException. We mock around it for this test. 714 } 715 // writing more data 716 int moreRow = 10; 717 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 718 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 719 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 720 Bytes.toBytes("val")); 721 region.put(put); 722 } 723 writtenRowCount += moreRow; 724 // call flush again 725 CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 726 try { 727 region.flush(true); 728 } catch (IOException t) { 729 LOG.info("Expected exception when flushing region because server is stopped," 730 + t.getMessage()); 731 } 732 733 region.close(true); 734 wal.shutdown(); 735 736 // Let us try to split and recover 737 runWALSplit(this.conf); 738 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 739 Mockito.doReturn(false).when(rsServices).isAborted(); 740 HRegion region2 = 741 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); 742 scanner = region2.getScanner(new Scan()); 743 assertEquals(writtenRowCount, getScannedCount(scanner)); 744 } 745 746 private int getScannedCount(RegionScanner scanner) throws IOException { 747 int scannedCount = 0; 748 List<Cell> results = new ArrayList<>(); 749 while (true) { 750 boolean existMore = scanner.next(results); 751 if (!results.isEmpty()) 752 scannedCount++; 753 if (!existMore) 754 break; 755 results.clear(); 756 } 757 return scannedCount; 758 } 759 760 /** 761 * Create an HRegion with the result of a WAL split and test we only see the 762 * good edits 763 * @throws Exception 764 */ 765 @Test 766 public void testReplayEditsWrittenIntoWAL() throws Exception { 767 final TableName tableName = 768 TableName.valueOf("testReplayEditsWrittenIntoWAL"); 769 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 770 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 771 final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); 772 deleteDir(basedir); 773 774 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 775 HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 776 HBaseTestingUtility.closeRegionAndWAL(region2); 777 final WAL wal = createWAL(this.conf, hbaseRootDir, logName); 778 final byte[] rowName = tableName.getName(); 779 final byte[] regionName = hri.getEncodedNameAsBytes(); 780 781 // Add 1k to each family. 782 final int countPerFamily = 1000; 783 Set<byte[]> familyNames = new HashSet<>(); 784 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 785 for(byte[] fam : htd.getFamiliesKeys()) { 786 scopes.put(fam, 0); 787 } 788 for (HColumnDescriptor hcd: htd.getFamilies()) { 789 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, 790 ee, wal, htd, mvcc, scopes); 791 familyNames.add(hcd.getName()); 792 } 793 794 // Add a cache flush, shouldn't have any effect 795 wal.startCacheFlush(regionName, familyNames); 796 wal.completeCacheFlush(regionName); 797 798 // Add an edit to another family, should be skipped. 799 WALEdit edit = new WALEdit(); 800 long now = ee.currentTime(); 801 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, 802 now, rowName)); 803 wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, 804 true); 805 806 // Delete the c family to verify deletes make it over. 807 edit = new WALEdit(); 808 now = ee.currentTime(); 809 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); 810 wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, 811 true); 812 813 // Sync. 814 wal.sync(); 815 // Make a new conf and a new fs for the splitter to run on so we can take 816 // over old wal. 817 final Configuration newConf = HBaseConfiguration.create(this.conf); 818 User user = HBaseTestingUtility.getDifferentUser(newConf, 819 ".replay.wal.secondtime"); 820 user.runAs(new PrivilegedExceptionAction<Void>() { 821 @Override 822 public Void run() throws Exception { 823 runWALSplit(newConf); 824 FileSystem newFS = FileSystem.get(newConf); 825 // 100k seems to make for about 4 flushes during HRegion#initialize. 826 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); 827 // Make a new wal for new region. 828 WAL newWal = createWAL(newConf, hbaseRootDir, logName); 829 final AtomicInteger flushcount = new AtomicInteger(0); 830 try { 831 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { 832 @Override 833 protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, 834 final Collection<HStore> storesToFlush, MonitoredTask status, 835 boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { 836 LOG.info("InternalFlushCache Invoked"); 837 FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, 838 Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); 839 flushcount.incrementAndGet(); 840 return fs; 841 } 842 }; 843 // The seq id this region has opened up with 844 long seqid = region.initialize(); 845 846 // The mvcc readpoint of from inserting data. 847 long writePoint = mvcc.getWritePoint(); 848 849 // We flushed during init. 850 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); 851 assertTrue((seqid - 1) == writePoint); 852 853 Get get = new Get(rowName); 854 Result result = region.get(get); 855 // Make sure we only see the good edits 856 assertEquals(countPerFamily * (htd.getFamilies().size() - 1), 857 result.size()); 858 region.close(); 859 } finally { 860 newWal.close(); 861 } 862 return null; 863 } 864 }); 865 } 866 867 @Test 868 // the following test is for HBASE-6065 869 public void testSequentialEditLogSeqNum() throws IOException { 870 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 871 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 872 final Path basedir = 873 FSUtils.getWALTableDir(conf, tableName); 874 deleteDir(basedir); 875 final byte[] rowName = tableName.getName(); 876 final int countPerFamily = 10; 877 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 878 879 // Mock the WAL 880 MockWAL wal = createMockWAL(); 881 882 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 883 for (HColumnDescriptor hcd : htd.getFamilies()) { 884 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 885 } 886 887 // Let us flush the region 888 // But this time completeflushcache is not yet done 889 region.flush(true); 890 for (HColumnDescriptor hcd : htd.getFamilies()) { 891 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); 892 } 893 long lastestSeqNumber = region.getReadPoint(null); 894 // get the current seq no 895 wal.doCompleteCacheFlush = true; 896 // allow complete cache flush with the previous seq number got after first 897 // set of edits. 898 wal.completeCacheFlush(hri.getEncodedNameAsBytes()); 899 wal.shutdown(); 900 FileStatus[] listStatus = wal.getFiles(); 901 assertNotNull(listStatus); 902 assertTrue(listStatus.length > 0); 903 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], 904 this.fs, this.conf, null, null, null, wals); 905 FileStatus[] listStatus1 = this.fs.listStatus( 906 new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(), 907 "recovered.edits")), new PathFilter() { 908 @Override 909 public boolean accept(Path p) { 910 if (WALSplitter.isSequenceIdFile(p)) { 911 return false; 912 } 913 return true; 914 } 915 }); 916 int editCount = 0; 917 for (FileStatus fileStatus : listStatus1) { 918 editCount = Integer.parseInt(fileStatus.getPath().getName()); 919 } 920 // The sequence number should be same 921 assertEquals( 922 "The sequence number of the recoverd.edits and the current edit seq should be same", 923 lastestSeqNumber, editCount); 924 } 925 926 /** 927 * testcase for https://issues.apache.org/jira/browse/HBASE-15252 928 */ 929 @Test 930 public void testDatalossWhenInputError() throws Exception { 931 final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); 932 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 933 final Path basedir = FSUtils.getWALTableDir(conf, tableName); 934 deleteDir(basedir); 935 final byte[] rowName = tableName.getName(); 936 final int countPerFamily = 10; 937 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 938 HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 939 Path regionDir = region1.getWALRegionDir(); 940 HBaseTestingUtility.closeRegionAndWAL(region1); 941 942 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 943 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 944 for (HColumnDescriptor hcd : htd.getFamilies()) { 945 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 946 } 947 // Now assert edits made it in. 948 final Get g = new Get(rowName); 949 Result result = region.get(g); 950 assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); 951 // Now close the region (without flush), split the log, reopen the region and assert that 952 // replay of log has the correct effect. 953 region.close(true); 954 wal.shutdown(); 955 956 runWALSplit(this.conf); 957 958 // here we let the DFSInputStream throw an IOException just after the WALHeader. 959 Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first(); 960 FSDataInputStream stream = fs.open(editFile); 961 stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); 962 Class<? extends AbstractFSWALProvider.Reader> logReaderClass = 963 conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 964 AbstractFSWALProvider.Reader.class); 965 AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance(); 966 reader.init(this.fs, editFile, conf, stream); 967 final long headerLength = stream.getPos(); 968 reader.close(); 969 FileSystem spyFs = spy(this.fs); 970 doAnswer(new Answer<FSDataInputStream>() { 971 972 @Override 973 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 974 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); 975 Field field = FilterInputStream.class.getDeclaredField("in"); 976 field.setAccessible(true); 977 final DFSInputStream in = (DFSInputStream) field.get(stream); 978 DFSInputStream spyIn = spy(in); 979 doAnswer(new Answer<Integer>() { 980 981 private long pos; 982 983 @Override 984 public Integer answer(InvocationOnMock invocation) throws Throwable { 985 if (pos >= headerLength) { 986 throw new IOException("read over limit"); 987 } 988 int b = (Integer) invocation.callRealMethod(); 989 if (b > 0) { 990 pos += b; 991 } 992 return b; 993 } 994 }).when(spyIn).read(any(byte[].class), anyInt(), anyInt()); 995 doAnswer(new Answer<Void>() { 996 997 @Override 998 public Void answer(InvocationOnMock invocation) throws Throwable { 999 invocation.callRealMethod(); 1000 in.close(); 1001 return null; 1002 } 1003 }).when(spyIn).close(); 1004 field.set(stream, spyIn); 1005 return stream; 1006 } 1007 }).when(spyFs).open(eq(editFile)); 1008 1009 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 1010 HRegion region2; 1011 try { 1012 // log replay should fail due to the IOException, otherwise we may lose data. 1013 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); 1014 assertEquals(result.size(), region2.get(g).size()); 1015 } catch (IOException e) { 1016 assertEquals("read over limit", e.getMessage()); 1017 } 1018 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); 1019 assertEquals(result.size(), region2.get(g).size()); 1020 } 1021 1022 /** 1023 * testcase for https://issues.apache.org/jira/browse/HBASE-14949. 1024 */ 1025 private void testNameConflictWhenSplit(boolean largeFirst) throws IOException, 1026 StreamLacksCapabilityException { 1027 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 1028 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 1029 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 1030 final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); 1031 deleteDir(basedir); 1032 1033 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 1034 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1035 for (byte[] fam : htd.getFamiliesKeys()) { 1036 scopes.put(fam, 0); 1037 } 1038 HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 1039 HBaseTestingUtility.closeRegionAndWAL(region); 1040 final byte[] family = htd.getColumnFamilies()[0].getName(); 1041 final byte[] rowName = tableName.getName(); 1042 FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); 1043 FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); 1044 1045 Path largeFile = new Path(logDir, "wal-1"); 1046 Path smallFile = new Path(logDir, "wal-2"); 1047 writerWALFile(largeFile, Arrays.asList(entry1, entry2)); 1048 writerWALFile(smallFile, Arrays.asList(entry2)); 1049 FileStatus first, second; 1050 if (largeFirst) { 1051 first = fs.getFileStatus(largeFile); 1052 second = fs.getFileStatus(smallFile); 1053 } else { 1054 first = fs.getFileStatus(smallFile); 1055 second = fs.getFileStatus(largeFile); 1056 } 1057 WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals); 1058 WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals); 1059 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 1060 region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); 1061 assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); 1062 assertEquals(2, region.get(new Get(rowName)).size()); 1063 } 1064 1065 @Test 1066 public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { 1067 testNameConflictWhenSplit(true); 1068 } 1069 1070 @Test 1071 public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { 1072 testNameConflictWhenSplit(false); 1073 } 1074 1075 static class MockWAL extends FSHLog { 1076 boolean doCompleteCacheFlush = false; 1077 1078 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) 1079 throws IOException { 1080 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 1081 } 1082 1083 @Override 1084 public void completeCacheFlush(byte[] encodedRegionName) { 1085 if (!doCompleteCacheFlush) { 1086 return; 1087 } 1088 super.completeCacheFlush(encodedRegionName); 1089 } 1090 } 1091 1092 private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) { 1093 HTableDescriptor htd = new HTableDescriptor(tableName); 1094 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); 1095 htd.addFamily(a); 1096 return htd; 1097 } 1098 1099 private MockWAL createMockWAL() throws IOException { 1100 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); 1101 // Set down maximum recovery so we dfsclient doesn't linger retrying something 1102 // long gone. 1103 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); 1104 return wal; 1105 } 1106 1107 // Flusher used in this test. Keep count of how often we are called and 1108 // actually run the flush inside here. 1109 static class TestFlusher implements FlushRequester { 1110 private HRegion r; 1111 1112 @Override 1113 public void requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) { 1114 try { 1115 r.flush(force); 1116 } catch (IOException e) { 1117 throw new RuntimeException("Exception flushing", e); 1118 } 1119 } 1120 1121 @Override 1122 public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { 1123 } 1124 1125 @Override 1126 public void registerFlushRequestListener(FlushRequestListener listener) { 1127 1128 } 1129 1130 @Override 1131 public boolean unregisterFlushRequestListener(FlushRequestListener listener) { 1132 return false; 1133 } 1134 1135 @Override 1136 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 1137 1138 } 1139 } 1140 1141 private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri, 1142 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { 1143 return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); 1144 } 1145 1146 private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, 1147 int index) { 1148 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); 1149 byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); 1150 WALEdit edit = new WALEdit(); 1151 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 1152 return edit; 1153 } 1154 1155 private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, 1156 byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, 1157 int index, NavigableMap<byte[], Integer> scopes) throws IOException { 1158 FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), 1159 createWALEdit(rowName, family, ee, index), hri, true, null); 1160 entry.stampRegionSequenceId(mvcc.begin()); 1161 return entry; 1162 } 1163 1164 private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, 1165 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 1166 final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, 1167 NavigableMap<byte[], Integer> scopes) throws IOException { 1168 for (int j = 0; j < count; j++) { 1169 wal.append(hri, createWALKey(tableName, hri, mvcc, scopes), 1170 createWALEdit(rowName, family, ee, j), true); 1171 } 1172 wal.sync(); 1173 } 1174 1175 static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, 1176 EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { 1177 List<Put> puts = new ArrayList<>(); 1178 for (int j = 0; j < count; j++) { 1179 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); 1180 Put p = new Put(rowName); 1181 p.addColumn(family, qualifier, ee.currentTime(), rowName); 1182 r.put(p); 1183 puts.add(p); 1184 } 1185 return puts; 1186 } 1187 1188 /* 1189 * Creates an HRI around an HTD that has <code>tableName</code> and three 1190 * column families named 'a','b', and 'c'. 1191 * @param tableName Name of table to use when we create HTableDescriptor. 1192 */ 1193 private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { 1194 return new HRegionInfo(tableName, null, null, false); 1195 } 1196 1197 /* 1198 * Run the split. Verify only single split file made. 1199 * @param c 1200 * @return The single split file made 1201 * @throws IOException 1202 */ 1203 private Path runWALSplit(final Configuration c) throws IOException { 1204 List<Path> splits = WALSplitter.split( 1205 hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 1206 // Split should generate only 1 file since there's only 1 region 1207 assertEquals("splits=" + splits, 1, splits.size()); 1208 // Make sure the file exists 1209 assertTrue(fs.exists(splits.get(0))); 1210 LOG.info("Split file=" + splits.get(0)); 1211 return splits.get(0); 1212 } 1213 1214 private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) { 1215 HTableDescriptor htd = new HTableDescriptor(tableName); 1216 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); 1217 htd.addFamily(a); 1218 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b")); 1219 htd.addFamily(b); 1220 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c")); 1221 htd.addFamily(c); 1222 return htd; 1223 } 1224 1225 private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException, 1226 StreamLacksCapabilityException { 1227 fs.mkdirs(file.getParent()); 1228 ProtobufLogWriter writer = new ProtobufLogWriter(); 1229 writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); 1230 for (FSWALEntry entry : entries) { 1231 writer.append(entry); 1232 } 1233 writer.sync(); 1234 writer.close(); 1235 } 1236 1237 protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 1238 throws IOException; 1239}