001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.wal; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025import static org.mockito.ArgumentMatchers.any; 026import static org.mockito.ArgumentMatchers.anyInt; 027import static org.mockito.ArgumentMatchers.eq; 028import static org.mockito.Mockito.doAnswer; 029import static org.mockito.Mockito.spy; 030import static org.mockito.Mockito.when; 031 032import java.io.FilterInputStream; 033import java.io.IOException; 034import java.lang.reflect.Field; 035import java.security.PrivilegedExceptionAction; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.Collection; 039import java.util.HashSet; 040import java.util.List; 041import java.util.NavigableMap; 042import java.util.Set; 043import java.util.TreeMap; 044import java.util.concurrent.atomic.AtomicBoolean; 045import java.util.concurrent.atomic.AtomicInteger; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HColumnDescriptor; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionInfo; 058import org.apache.hadoop.hbase.HTableDescriptor; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.MiniHBaseCluster; 061import org.apache.hadoop.hbase.ServerName; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.ResultScanner; 068import org.apache.hadoop.hbase.client.Scan; 069import org.apache.hadoop.hbase.client.Table; 070import org.apache.hadoop.hbase.monitoring.MonitoredTask; 071import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 072import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 073import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 074import org.apache.hadoop.hbase.regionserver.FlushRequestListener; 075import org.apache.hadoop.hbase.regionserver.FlushRequester; 076import org.apache.hadoop.hbase.regionserver.HRegion; 077import org.apache.hadoop.hbase.regionserver.HRegionServer; 078import org.apache.hadoop.hbase.regionserver.HStore; 079import org.apache.hadoop.hbase.regionserver.MemStoreSizing; 080import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 081import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 082import org.apache.hadoop.hbase.regionserver.Region; 083import org.apache.hadoop.hbase.regionserver.RegionScanner; 084import org.apache.hadoop.hbase.regionserver.RegionServerServices; 085import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 086import org.apache.hadoop.hbase.security.User; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.CommonFSUtils; 089import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 090import org.apache.hadoop.hbase.util.EnvironmentEdge; 091import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 092import org.apache.hadoop.hbase.util.HFileTestUtil; 093import org.apache.hadoop.hbase.util.Pair; 094import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 095import org.apache.hadoop.hbase.wal.WAL; 096import org.apache.hadoop.hbase.wal.WALEdit; 097import org.apache.hadoop.hbase.wal.WALFactory; 098import org.apache.hadoop.hbase.wal.WALKeyImpl; 099import org.apache.hadoop.hbase.wal.WALSplitUtil; 100import org.apache.hadoop.hbase.wal.WALSplitter; 101import org.apache.hadoop.hdfs.DFSInputStream; 102import org.junit.After; 103import org.junit.AfterClass; 104import org.junit.Before; 105import org.junit.BeforeClass; 106import org.junit.Rule; 107import org.junit.Test; 108import org.junit.rules.TestName; 109import org.mockito.Mockito; 110import org.mockito.invocation.InvocationOnMock; 111import org.mockito.stubbing.Answer; 112import org.slf4j.Logger; 113import org.slf4j.LoggerFactory; 114 115/** 116 * Test replay of edits out of a WAL split. 117 */ 118public abstract class AbstractTestWALReplay { 119 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 120 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 121 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 122 private Path hbaseRootDir = null; 123 private String logName; 124 private Path oldLogDir; 125 private Path logDir; 126 private FileSystem fs; 127 private Configuration conf; 128 private WALFactory wals; 129 130 @Rule 131 public final TestName currentTest = new TestName(); 132 133 134 @BeforeClass 135 public static void setUpBeforeClass() throws Exception { 136 Configuration conf = TEST_UTIL.getConfiguration(); 137 // The below config supported by 0.20-append and CDH3b2 138 conf.setInt("dfs.client.block.recovery.retries", 2); 139 TEST_UTIL.startMiniCluster(3); 140 Path hbaseRootDir = 141 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 142 LOG.info("hbase.rootdir=" + hbaseRootDir); 143 CommonFSUtils.setRootDir(conf, hbaseRootDir); 144 } 145 146 @AfterClass 147 public static void tearDownAfterClass() throws Exception { 148 TEST_UTIL.shutdownMiniCluster(); 149 } 150 151 @Before 152 public void setUp() throws Exception { 153 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 154 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 155 this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf); 156 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 157 String serverName = 158 ServerName.valueOf(currentTest.getMethodName() + "-manual", 16010, System.currentTimeMillis()) 159 .toString(); 160 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 161 this.logDir = new Path(this.hbaseRootDir, logName); 162 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 163 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 164 } 165 this.wals = new WALFactory(conf, currentTest.getMethodName()); 166 } 167 168 @After 169 public void tearDown() throws Exception { 170 this.wals.close(); 171 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 172 } 173 174 /* 175 * @param p Directory to cleanup 176 */ 177 private void deleteDir(final Path p) throws IOException { 178 if (this.fs.exists(p)) { 179 if (!this.fs.delete(p, true)) { 180 throw new IOException("Failed remove of " + p); 181 } 182 } 183 } 184 185 /** 186 * 187 * @throws Exception 188 */ 189 @Test 190 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { 191 final TableName tableName = 192 TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); 193 byte[] family1 = Bytes.toBytes("cf1"); 194 byte[] family2 = Bytes.toBytes("cf2"); 195 byte[] qualifier = Bytes.toBytes("q"); 196 byte[] value = Bytes.toBytes("testV"); 197 byte[][] familys = { family1, family2 }; 198 TEST_UTIL.createTable(tableName, familys); 199 Table htable = TEST_UTIL.getConnection().getTable(tableName); 200 Put put = new Put(Bytes.toBytes("r1")); 201 put.addColumn(family1, qualifier, value); 202 htable.put(put); 203 ResultScanner resultScanner = htable.getScanner(new Scan()); 204 int count = 0; 205 while (resultScanner.next() != null) { 206 count++; 207 } 208 resultScanner.close(); 209 assertEquals(1, count); 210 211 MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); 212 List<HRegion> regions = hbaseCluster.getRegions(tableName); 213 assertEquals(1, regions.size()); 214 215 // move region to another regionserver 216 Region destRegion = regions.get(0); 217 int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName()); 218 assertTrue("Please start more than 1 regionserver", 219 hbaseCluster.getRegionServerThreads().size() > 1); 220 int destServerNum = 0; 221 while (destServerNum == originServerNum) { 222 destServerNum++; 223 } 224 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); 225 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); 226 // move region to destination regionserver 227 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName()); 228 229 // delete the row 230 Delete del = new Delete(Bytes.toBytes("r1")); 231 htable.delete(del); 232 resultScanner = htable.getScanner(new Scan()); 233 count = 0; 234 while (resultScanner.next() != null) { 235 count++; 236 } 237 resultScanner.close(); 238 assertEquals(0, count); 239 240 // flush region and make major compaction 241 HRegion region = 242 (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); 243 region.flush(true); 244 // wait to complete major compaction 245 for (HStore store : region.getStores()) { 246 store.triggerMajorCompaction(); 247 } 248 region.compact(true); 249 250 // move region to origin regionserver 251 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName()); 252 // abort the origin regionserver 253 originServer.abort("testing"); 254 255 // see what we get 256 Result result = htable.get(new Get(Bytes.toBytes("r1"))); 257 if (result != null) { 258 assertTrue("Row is deleted, but we get" + result.toString(), 259 (result == null) || result.isEmpty()); 260 } 261 resultScanner.close(); 262 } 263 264 /** 265 * Tests for hbase-2727. 266 * @throws Exception 267 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> 268 */ 269 @Test 270 public void test2727() throws Exception { 271 // Test being able to have > 1 set of edits in the recovered.edits directory. 272 // Ensure edits are replayed properly. 273 final TableName tableName = 274 TableName.valueOf("test2727"); 275 276 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 277 HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 278 Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 279 deleteDir(basedir); 280 281 HTableDescriptor htd = createBasic3FamilyHTD(tableName); 282 Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 283 HBaseTestingUtility.closeRegionAndWAL(region2); 284 final byte [] rowName = tableName.getName(); 285 286 WAL wal1 = createWAL(this.conf, hbaseRootDir, logName); 287 // Add 1k to each family. 288 final int countPerFamily = 1000; 289 290 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 291 for(byte[] fam : htd.getFamiliesKeys()) { 292 scopes.put(fam, 0); 293 } 294 for (HColumnDescriptor hcd: htd.getFamilies()) { 295 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, 296 wal1, htd, mvcc, scopes); 297 } 298 wal1.shutdown(); 299 runWALSplit(this.conf); 300 301 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 302 // Add 1k to each family. 303 for (HColumnDescriptor hcd: htd.getFamilies()) { 304 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, 305 ee, wal2, htd, mvcc, scopes); 306 } 307 wal2.shutdown(); 308 runWALSplit(this.conf); 309 310 WAL wal3 = createWAL(this.conf, hbaseRootDir, logName); 311 try { 312 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); 313 long seqid = region.getOpenSeqNum(); 314 // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. 315 // When opened, this region would apply 6k edits, and increment the sequenceId by 1 316 assertTrue(seqid > mvcc.getWritePoint()); 317 assertEquals(seqid - 1, mvcc.getWritePoint()); 318 LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " 319 + mvcc.getReadPoint()); 320 321 // TODO: Scan all. 322 region.close(); 323 } finally { 324 wal3.close(); 325 } 326 } 327 328 /** 329 * Test case of HRegion that is only made out of bulk loaded files. Assert 330 * that we don't 'crash'. 331 * @throws IOException 332 * @throws IllegalAccessException 333 * @throws NoSuchFieldException 334 * @throws IllegalArgumentException 335 * @throws SecurityException 336 */ 337 @Test 338 public void testRegionMadeOfBulkLoadedFilesOnly() 339 throws IOException, SecurityException, IllegalArgumentException, 340 NoSuchFieldException, IllegalAccessException, InterruptedException { 341 final TableName tableName = 342 TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); 343 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 344 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 345 deleteDir(basedir); 346 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 347 Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 348 HBaseTestingUtility.closeRegionAndWAL(region2); 349 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 350 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 351 352 byte [] family = htd.getFamilies().iterator().next().getName(); 353 Path f = new Path(basedir, "hfile"); 354 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), 355 Bytes.toBytes("z"), 10); 356 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 357 hfs.add(Pair.newPair(family, f.toString())); 358 region.bulkLoadHFiles(hfs, true, null); 359 360 // Add an edit so something in the WAL 361 byte[] row = tableName.getName(); 362 region.put((new Put(row)).addColumn(family, family, family)); 363 wal.sync(); 364 final int rowsInsertedCount = 11; 365 366 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 367 368 // Now 'crash' the region by stealing its wal 369 final Configuration newConf = HBaseConfiguration.create(this.conf); 370 User user = HBaseTestingUtility.getDifferentUser(newConf, 371 tableName.getNameAsString()); 372 user.runAs(new PrivilegedExceptionAction() { 373 @Override 374 public Object run() throws Exception { 375 runWALSplit(newConf); 376 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 377 378 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), 379 hbaseRootDir, hri, htd, wal2); 380 long seqid2 = region2.getOpenSeqNum(); 381 assertTrue(seqid2 > -1); 382 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 383 384 // I can't close wal1. Its been appropriated when we split. 385 region2.close(); 386 wal2.close(); 387 return null; 388 } 389 }); 390 } 391 392 /** 393 * HRegion test case that is made of a major compacted HFile (created with three bulk loaded 394 * files) and an edit in the memstore. 395 * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries 396 * from being replayed" 397 * @throws IOException 398 * @throws IllegalAccessException 399 * @throws NoSuchFieldException 400 * @throws IllegalArgumentException 401 * @throws SecurityException 402 */ 403 @Test 404 public void testCompactedBulkLoadedFiles() 405 throws IOException, SecurityException, IllegalArgumentException, 406 NoSuchFieldException, IllegalAccessException, InterruptedException { 407 final TableName tableName = 408 TableName.valueOf("testCompactedBulkLoadedFiles"); 409 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 410 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 411 deleteDir(basedir); 412 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 413 HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 414 HBaseTestingUtility.closeRegionAndWAL(region2); 415 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 416 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 417 418 // Add an edit so something in the WAL 419 byte [] row = tableName.getName(); 420 byte [] family = htd.getFamilies().iterator().next().getName(); 421 region.put((new Put(row)).addColumn(family, family, family)); 422 wal.sync(); 423 424 List <Pair<byte[],String>> hfs= new ArrayList<>(1); 425 for (int i = 0; i < 3; i++) { 426 Path f = new Path(basedir, "hfile"+i); 427 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), 428 Bytes.toBytes(i + "50"), 10); 429 hfs.add(Pair.newPair(family, f.toString())); 430 } 431 region.bulkLoadHFiles(hfs, true, null); 432 final int rowsInsertedCount = 31; 433 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 434 435 // major compact to turn all the bulk loaded files into one normal file 436 region.compact(true); 437 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 438 439 // Now 'crash' the region by stealing its wal 440 final Configuration newConf = HBaseConfiguration.create(this.conf); 441 User user = HBaseTestingUtility.getDifferentUser(newConf, 442 tableName.getNameAsString()); 443 user.runAs(new PrivilegedExceptionAction() { 444 @Override 445 public Object run() throws Exception { 446 runWALSplit(newConf); 447 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 448 449 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf), 450 hbaseRootDir, hri, htd, wal2); 451 long seqid2 = region2.getOpenSeqNum(); 452 assertTrue(seqid2 > -1); 453 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 454 455 // I can't close wal1. Its been appropriated when we split. 456 region2.close(); 457 wal2.close(); 458 return null; 459 } 460 }); 461 } 462 463 464 /** 465 * Test writing edits into an HRegion, closing it, splitting logs, opening 466 * Region again. Verify seqids. 467 * @throws IOException 468 * @throws IllegalAccessException 469 * @throws NoSuchFieldException 470 * @throws IllegalArgumentException 471 * @throws SecurityException 472 */ 473 @Test 474 public void testReplayEditsWrittenViaHRegion() 475 throws IOException, SecurityException, IllegalArgumentException, 476 NoSuchFieldException, IllegalAccessException, InterruptedException { 477 final TableName tableName = 478 TableName.valueOf("testReplayEditsWrittenViaHRegion"); 479 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 480 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 481 deleteDir(basedir); 482 final byte[] rowName = tableName.getName(); 483 final int countPerFamily = 10; 484 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 485 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 486 HBaseTestingUtility.closeRegionAndWAL(region3); 487 // Write countPerFamily edits into the three families. Do a flush on one 488 // of the families during the load of edits so its seqid is not same as 489 // others to test we do right thing when different seqids. 490 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 491 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 492 long seqid = region.getOpenSeqNum(); 493 boolean first = true; 494 for (HColumnDescriptor hcd: htd.getFamilies()) { 495 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 496 if (first) { 497 // If first, so we have at least one family w/ different seqid to rest. 498 region.flush(true); 499 first = false; 500 } 501 } 502 // Now assert edits made it in. 503 final Get g = new Get(rowName); 504 Result result = region.get(g); 505 assertEquals(countPerFamily * htd.getFamilies().size(), 506 result.size()); 507 // Now close the region (without flush), split the log, reopen the region and assert that 508 // replay of log has the correct effect, that our seqids are calculated correctly so 509 // all edits in logs are seen as 'stale'/old. 510 region.close(true); 511 wal.shutdown(); 512 runWALSplit(this.conf); 513 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 514 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); 515 long seqid2 = region2.getOpenSeqNum(); 516 assertTrue(seqid + result.size() < seqid2); 517 final Result result1b = region2.get(g); 518 assertEquals(result.size(), result1b.size()); 519 520 // Next test. Add more edits, then 'crash' this region by stealing its wal 521 // out from under it and assert that replay of the log adds the edits back 522 // correctly when region is opened again. 523 for (HColumnDescriptor hcd: htd.getFamilies()) { 524 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); 525 } 526 // Get count of edits. 527 final Result result2 = region2.get(g); 528 assertEquals(2 * result.size(), result2.size()); 529 wal2.sync(); 530 final Configuration newConf = HBaseConfiguration.create(this.conf); 531 User user = HBaseTestingUtility.getDifferentUser(newConf, 532 tableName.getNameAsString()); 533 user.runAs(new PrivilegedExceptionAction<Object>() { 534 @Override 535 public Object run() throws Exception { 536 runWALSplit(newConf); 537 FileSystem newFS = FileSystem.get(newConf); 538 // Make a new wal for new region open. 539 WAL wal3 = createWAL(newConf, hbaseRootDir, logName); 540 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); 541 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { 542 @Override 543 protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) { 544 super.restoreEdit(s, cell, memstoreSizing); 545 countOfRestoredEdits.incrementAndGet(); 546 } 547 }; 548 long seqid3 = region3.initialize(); 549 Result result3 = region3.get(g); 550 // Assert that count of cells is same as before crash. 551 assertEquals(result2.size(), result3.size()); 552 assertEquals(htd.getFamilies().size() * countPerFamily, 553 countOfRestoredEdits.get()); 554 555 // I can't close wal1. Its been appropriated when we split. 556 region3.close(); 557 wal3.close(); 558 return null; 559 } 560 }); 561 } 562 563 /** 564 * Test that we recover correctly when there is a failure in between the 565 * flushes. i.e. Some stores got flushed but others did not. 566 * 567 * Unfortunately, there is no easy hook to flush at a store level. The way 568 * we get around this is by flushing at the region level, and then deleting 569 * the recently flushed store file for one of the Stores. This would put us 570 * back in the situation where all but that store got flushed and the region 571 * died. 572 * 573 * We restart Region again, and verify that the edits were replayed. 574 * 575 * @throws IOException 576 * @throws IllegalAccessException 577 * @throws NoSuchFieldException 578 * @throws IllegalArgumentException 579 * @throws SecurityException 580 */ 581 @Test 582 public void testReplayEditsAfterPartialFlush() 583 throws IOException, SecurityException, IllegalArgumentException, 584 NoSuchFieldException, IllegalAccessException, InterruptedException { 585 final TableName tableName = 586 TableName.valueOf("testReplayEditsWrittenViaHRegion"); 587 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 588 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 589 deleteDir(basedir); 590 final byte[] rowName = tableName.getName(); 591 final int countPerFamily = 10; 592 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 593 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 594 HBaseTestingUtility.closeRegionAndWAL(region3); 595 // Write countPerFamily edits into the three families. Do a flush on one 596 // of the families during the load of edits so its seqid is not same as 597 // others to test we do right thing when different seqids. 598 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 599 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 600 long seqid = region.getOpenSeqNum(); 601 for (HColumnDescriptor hcd: htd.getFamilies()) { 602 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 603 } 604 605 // Now assert edits made it in. 606 final Get g = new Get(rowName); 607 Result result = region.get(g); 608 assertEquals(countPerFamily * htd.getFamilies().size(), 609 result.size()); 610 611 // Let us flush the region 612 region.flush(true); 613 region.close(true); 614 wal.shutdown(); 615 616 // delete the store files in the second column family to simulate a failure 617 // in between the flushcache(); 618 // we have 3 families. killing the middle one ensures that taking the maximum 619 // will make us fail. 620 int cf_count = 0; 621 for (HColumnDescriptor hcd: htd.getFamilies()) { 622 cf_count++; 623 if (cf_count == 2) { 624 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); 625 } 626 } 627 628 629 // Let us try to split and recover 630 runWALSplit(this.conf); 631 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 632 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); 633 long seqid2 = region2.getOpenSeqNum(); 634 assertTrue(seqid + result.size() < seqid2); 635 636 final Result result1b = region2.get(g); 637 assertEquals(result.size(), result1b.size()); 638 } 639 640 641 // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. 642 // Only throws exception if throwExceptionWhenFlushing is set true. 643 public static class CustomStoreFlusher extends DefaultStoreFlusher { 644 // Switch between throw and not throw exception in flush 645 public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); 646 647 public CustomStoreFlusher(Configuration conf, HStore store) { 648 super(conf, store); 649 } 650 651 @Override 652 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 653 MonitoredTask status, ThroughputController throughputController, 654 FlushLifeCycleTracker tracker) throws IOException { 655 if (throwExceptionWhenFlushing.get()) { 656 throw new IOException("Simulated exception by tests"); 657 } 658 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); 659 } 660 }; 661 662 /** 663 * Test that we could recover the data correctly after aborting flush. In the 664 * test, first we abort flush after writing some data, then writing more data 665 * and flush again, at last verify the data. 666 * @throws IOException 667 */ 668 @Test 669 public void testReplayEditsAfterAbortingFlush() throws IOException { 670 final TableName tableName = 671 TableName.valueOf("testReplayEditsAfterAbortingFlush"); 672 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 673 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 674 deleteDir(basedir); 675 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 676 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 677 HBaseTestingUtility.closeRegionAndWAL(region3); 678 // Write countPerFamily edits into the three families. Do a flush on one 679 // of the families during the load of edits so its seqid is not same as 680 // others to test we do right thing when different seqids. 681 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 682 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 683 Mockito.doReturn(false).when(rsServices).isAborted(); 684 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 685 when(rsServices.getConfiguration()).thenReturn(conf); 686 Configuration customConf = new Configuration(this.conf); 687 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 688 CustomStoreFlusher.class.getName()); 689 HRegion region = 690 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); 691 int writtenRowCount = 10; 692 List<HColumnDescriptor> families = new ArrayList<>(htd.getFamilies()); 693 for (int i = 0; i < writtenRowCount; i++) { 694 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 695 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 696 Bytes.toBytes("val")); 697 region.put(put); 698 } 699 700 // Now assert edits made it in. 701 RegionScanner scanner = region.getScanner(new Scan()); 702 assertEquals(writtenRowCount, getScannedCount(scanner)); 703 704 // Let us flush the region 705 CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 706 try { 707 region.flush(true); 708 fail("Injected exception hasn't been thrown"); 709 } catch (IOException e) { 710 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 711 // simulated to abort server 712 Mockito.doReturn(true).when(rsServices).isAborted(); 713 region.setClosing(false); // region normally does not accept writes after 714 // DroppedSnapshotException. We mock around it for this test. 715 } 716 // writing more data 717 int moreRow = 10; 718 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 719 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 720 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 721 Bytes.toBytes("val")); 722 region.put(put); 723 } 724 writtenRowCount += moreRow; 725 // call flush again 726 CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 727 try { 728 region.flush(true); 729 } catch (IOException t) { 730 LOG.info("Expected exception when flushing region because server is stopped," 731 + t.getMessage()); 732 } 733 734 region.close(true); 735 wal.shutdown(); 736 737 // Let us try to split and recover 738 runWALSplit(this.conf); 739 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 740 Mockito.doReturn(false).when(rsServices).isAborted(); 741 HRegion region2 = 742 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); 743 scanner = region2.getScanner(new Scan()); 744 assertEquals(writtenRowCount, getScannedCount(scanner)); 745 } 746 747 private int getScannedCount(RegionScanner scanner) throws IOException { 748 int scannedCount = 0; 749 List<Cell> results = new ArrayList<>(); 750 while (true) { 751 boolean existMore = scanner.next(results); 752 if (!results.isEmpty()) 753 scannedCount++; 754 if (!existMore) 755 break; 756 results.clear(); 757 } 758 return scannedCount; 759 } 760 761 /** 762 * Create an HRegion with the result of a WAL split and test we only see the 763 * good edits 764 * @throws Exception 765 */ 766 @Test 767 public void testReplayEditsWrittenIntoWAL() throws Exception { 768 final TableName tableName = 769 TableName.valueOf("testReplayEditsWrittenIntoWAL"); 770 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 771 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 772 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 773 deleteDir(basedir); 774 775 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 776 HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 777 HBaseTestingUtility.closeRegionAndWAL(region2); 778 final WAL wal = createWAL(this.conf, hbaseRootDir, logName); 779 final byte[] rowName = tableName.getName(); 780 final byte[] regionName = hri.getEncodedNameAsBytes(); 781 782 // Add 1k to each family. 783 final int countPerFamily = 1000; 784 Set<byte[]> familyNames = new HashSet<>(); 785 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 786 for(byte[] fam : htd.getFamiliesKeys()) { 787 scopes.put(fam, 0); 788 } 789 for (HColumnDescriptor hcd: htd.getFamilies()) { 790 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, 791 ee, wal, htd, mvcc, scopes); 792 familyNames.add(hcd.getName()); 793 } 794 795 // Add a cache flush, shouldn't have any effect 796 wal.startCacheFlush(regionName, familyNames); 797 wal.completeCacheFlush(regionName); 798 799 // Add an edit to another family, should be skipped. 800 WALEdit edit = new WALEdit(); 801 long now = ee.currentTime(); 802 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, 803 now, rowName)); 804 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 805 edit); 806 807 // Delete the c family to verify deletes make it over. 808 edit = new WALEdit(); 809 now = ee.currentTime(); 810 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); 811 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 812 edit); 813 814 // Sync. 815 wal.sync(); 816 // Make a new conf and a new fs for the splitter to run on so we can take 817 // over old wal. 818 final Configuration newConf = HBaseConfiguration.create(this.conf); 819 User user = HBaseTestingUtility.getDifferentUser(newConf, 820 ".replay.wal.secondtime"); 821 user.runAs(new PrivilegedExceptionAction<Void>() { 822 @Override 823 public Void run() throws Exception { 824 runWALSplit(newConf); 825 FileSystem newFS = FileSystem.get(newConf); 826 // 100k seems to make for about 4 flushes during HRegion#initialize. 827 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); 828 // Make a new wal for new region. 829 WAL newWal = createWAL(newConf, hbaseRootDir, logName); 830 final AtomicInteger flushcount = new AtomicInteger(0); 831 try { 832 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { 833 @Override 834 protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, 835 final Collection<HStore> storesToFlush, MonitoredTask status, 836 boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { 837 LOG.info("InternalFlushCache Invoked"); 838 FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, 839 Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); 840 flushcount.incrementAndGet(); 841 return fs; 842 } 843 }; 844 // The seq id this region has opened up with 845 long seqid = region.initialize(); 846 847 // The mvcc readpoint of from inserting data. 848 long writePoint = mvcc.getWritePoint(); 849 850 // We flushed during init. 851 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); 852 assertTrue((seqid - 1) == writePoint); 853 854 Get get = new Get(rowName); 855 Result result = region.get(get); 856 // Make sure we only see the good edits 857 assertEquals(countPerFamily * (htd.getFamilies().size() - 1), 858 result.size()); 859 region.close(); 860 } finally { 861 newWal.close(); 862 } 863 return null; 864 } 865 }); 866 } 867 868 @Test 869 // the following test is for HBASE-6065 870 public void testSequentialEditLogSeqNum() throws IOException { 871 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 872 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 873 final Path basedir = 874 CommonFSUtils.getWALTableDir(conf, tableName); 875 deleteDir(basedir); 876 final byte[] rowName = tableName.getName(); 877 final int countPerFamily = 10; 878 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 879 880 // Mock the WAL 881 MockWAL wal = createMockWAL(); 882 883 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 884 for (HColumnDescriptor hcd : htd.getFamilies()) { 885 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 886 } 887 888 // Let us flush the region 889 // But this time completeflushcache is not yet done 890 region.flush(true); 891 for (HColumnDescriptor hcd : htd.getFamilies()) { 892 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); 893 } 894 long lastestSeqNumber = region.getReadPoint(null); 895 // get the current seq no 896 wal.doCompleteCacheFlush = true; 897 // allow complete cache flush with the previous seq number got after first 898 // set of edits. 899 wal.completeCacheFlush(hri.getEncodedNameAsBytes()); 900 wal.shutdown(); 901 FileStatus[] listStatus = wal.getFiles(); 902 assertNotNull(listStatus); 903 assertTrue(listStatus.length > 0); 904 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, 905 wals, null); 906 FileStatus[] listStatus1 = 907 this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName), 908 new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { 909 @Override 910 public boolean accept(Path p) { 911 return !WALSplitUtil.isSequenceIdFile(p); 912 } 913 }); 914 int editCount = 0; 915 for (FileStatus fileStatus : listStatus1) { 916 editCount = Integer.parseInt(fileStatus.getPath().getName()); 917 } 918 // The sequence number should be same 919 assertEquals( 920 "The sequence number of the recoverd.edits and the current edit seq should be same", 921 lastestSeqNumber, editCount); 922 } 923 924 /** 925 * testcase for https://issues.apache.org/jira/browse/HBASE-15252 926 */ 927 @Test 928 public void testDatalossWhenInputError() throws Exception { 929 final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); 930 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 931 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 932 deleteDir(basedir); 933 final byte[] rowName = tableName.getName(); 934 final int countPerFamily = 10; 935 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 936 HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 937 Path regionDir = region1.getWALRegionDir(); 938 HBaseTestingUtility.closeRegionAndWAL(region1); 939 940 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 941 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 942 for (HColumnDescriptor hcd : htd.getFamilies()) { 943 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 944 } 945 // Now assert edits made it in. 946 final Get g = new Get(rowName); 947 Result result = region.get(g); 948 assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); 949 // Now close the region (without flush), split the log, reopen the region and assert that 950 // replay of log has the correct effect. 951 region.close(true); 952 wal.shutdown(); 953 954 runWALSplit(this.conf); 955 956 // here we let the DFSInputStream throw an IOException just after the WALHeader. 957 Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first(); 958 FSDataInputStream stream = fs.open(editFile); 959 stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); 960 Class<? extends AbstractFSWALProvider.Reader> logReaderClass = 961 conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 962 AbstractFSWALProvider.Reader.class); 963 AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance(); 964 reader.init(this.fs, editFile, conf, stream); 965 final long headerLength = stream.getPos(); 966 reader.close(); 967 FileSystem spyFs = spy(this.fs); 968 doAnswer(new Answer<FSDataInputStream>() { 969 970 @Override 971 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 972 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); 973 Field field = FilterInputStream.class.getDeclaredField("in"); 974 field.setAccessible(true); 975 final DFSInputStream in = (DFSInputStream) field.get(stream); 976 DFSInputStream spyIn = spy(in); 977 doAnswer(new Answer<Integer>() { 978 979 private long pos; 980 981 @Override 982 public Integer answer(InvocationOnMock invocation) throws Throwable { 983 if (pos >= headerLength) { 984 throw new IOException("read over limit"); 985 } 986 int b = (Integer) invocation.callRealMethod(); 987 if (b > 0) { 988 pos += b; 989 } 990 return b; 991 } 992 }).when(spyIn).read(any(byte[].class), anyInt(), anyInt()); 993 doAnswer(new Answer<Void>() { 994 995 @Override 996 public Void answer(InvocationOnMock invocation) throws Throwable { 997 invocation.callRealMethod(); 998 in.close(); 999 return null; 1000 } 1001 }).when(spyIn).close(); 1002 field.set(stream, spyIn); 1003 return stream; 1004 } 1005 }).when(spyFs).open(eq(editFile)); 1006 1007 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 1008 HRegion region2; 1009 try { 1010 // log replay should fail due to the IOException, otherwise we may lose data. 1011 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); 1012 assertEquals(result.size(), region2.get(g).size()); 1013 } catch (IOException e) { 1014 assertEquals("read over limit", e.getMessage()); 1015 } 1016 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); 1017 assertEquals(result.size(), region2.get(g).size()); 1018 } 1019 1020 /** 1021 * testcase for https://issues.apache.org/jira/browse/HBASE-14949. 1022 */ 1023 private void testNameConflictWhenSplit(boolean largeFirst) throws IOException, 1024 StreamLacksCapabilityException { 1025 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 1026 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 1027 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 1028 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 1029 deleteDir(basedir); 1030 1031 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 1032 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1033 for (byte[] fam : htd.getFamiliesKeys()) { 1034 scopes.put(fam, 0); 1035 } 1036 HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 1037 HBaseTestingUtility.closeRegionAndWAL(region); 1038 final byte[] family = htd.getColumnFamilies()[0].getName(); 1039 final byte[] rowName = tableName.getName(); 1040 FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); 1041 FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); 1042 1043 Path largeFile = new Path(logDir, "wal-1"); 1044 Path smallFile = new Path(logDir, "wal-2"); 1045 writerWALFile(largeFile, Arrays.asList(entry1, entry2)); 1046 writerWALFile(smallFile, Arrays.asList(entry2)); 1047 FileStatus first, second; 1048 if (largeFirst) { 1049 first = fs.getFileStatus(largeFile); 1050 second = fs.getFileStatus(smallFile); 1051 } else { 1052 first = fs.getFileStatus(smallFile); 1053 second = fs.getFileStatus(largeFile); 1054 } 1055 WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null); 1056 WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null); 1057 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 1058 region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); 1059 assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); 1060 assertEquals(2, region.get(new Get(rowName)).size()); 1061 } 1062 1063 @Test 1064 public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { 1065 testNameConflictWhenSplit(true); 1066 } 1067 1068 @Test 1069 public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { 1070 testNameConflictWhenSplit(false); 1071 } 1072 1073 static class MockWAL extends FSHLog { 1074 boolean doCompleteCacheFlush = false; 1075 1076 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) 1077 throws IOException { 1078 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 1079 } 1080 1081 @Override 1082 public void completeCacheFlush(byte[] encodedRegionName) { 1083 if (!doCompleteCacheFlush) { 1084 return; 1085 } 1086 super.completeCacheFlush(encodedRegionName); 1087 } 1088 } 1089 1090 private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) { 1091 HTableDescriptor htd = new HTableDescriptor(tableName); 1092 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); 1093 htd.addFamily(a); 1094 return htd; 1095 } 1096 1097 private MockWAL createMockWAL() throws IOException { 1098 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); 1099 wal.init(); 1100 // Set down maximum recovery so we dfsclient doesn't linger retrying something 1101 // long gone. 1102 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); 1103 return wal; 1104 } 1105 1106 // Flusher used in this test. Keep count of how often we are called and 1107 // actually run the flush inside here. 1108 static class TestFlusher implements FlushRequester { 1109 private HRegion r; 1110 1111 @Override 1112 public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) { 1113 try { 1114 r.flush(force); 1115 return true; 1116 } catch (IOException e) { 1117 throw new RuntimeException("Exception flushing", e); 1118 } 1119 } 1120 1121 @Override 1122 public boolean requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { 1123 return true; 1124 } 1125 1126 @Override 1127 public void registerFlushRequestListener(FlushRequestListener listener) { 1128 1129 } 1130 1131 @Override 1132 public boolean unregisterFlushRequestListener(FlushRequestListener listener) { 1133 return false; 1134 } 1135 1136 @Override 1137 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 1138 1139 } 1140 } 1141 1142 private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri, 1143 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { 1144 return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); 1145 } 1146 1147 private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, 1148 int index) { 1149 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); 1150 byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); 1151 WALEdit edit = new WALEdit(); 1152 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 1153 return edit; 1154 } 1155 1156 private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, 1157 byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, 1158 int index, NavigableMap<byte[], Integer> scopes) throws IOException { 1159 FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), 1160 createWALEdit(rowName, family, ee, index), hri, true, null); 1161 entry.stampRegionSequenceId(mvcc.begin()); 1162 return entry; 1163 } 1164 1165 private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, 1166 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 1167 final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, 1168 NavigableMap<byte[], Integer> scopes) throws IOException { 1169 for (int j = 0; j < count; j++) { 1170 wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), 1171 createWALEdit(rowName, family, ee, j)); 1172 } 1173 wal.sync(); 1174 } 1175 1176 public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, 1177 EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { 1178 List<Put> puts = new ArrayList<>(); 1179 for (int j = 0; j < count; j++) { 1180 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); 1181 Put p = new Put(rowName); 1182 p.addColumn(family, qualifier, ee.currentTime(), rowName); 1183 r.put(p); 1184 puts.add(p); 1185 } 1186 return puts; 1187 } 1188 1189 /* 1190 * Creates an HRI around an HTD that has <code>tableName</code> and three 1191 * column families named 'a','b', and 'c'. 1192 * @param tableName Name of table to use when we create HTableDescriptor. 1193 */ 1194 private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { 1195 return new HRegionInfo(tableName, null, null, false); 1196 } 1197 1198 /* 1199 * Run the split. Verify only single split file made. 1200 * @param c 1201 * @return The single split file made 1202 * @throws IOException 1203 */ 1204 private Path runWALSplit(final Configuration c) throws IOException { 1205 List<Path> splits = WALSplitter.split( 1206 hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 1207 // Split should generate only 1 file since there's only 1 region 1208 assertEquals("splits=" + splits, 1, splits.size()); 1209 // Make sure the file exists 1210 assertTrue(fs.exists(splits.get(0))); 1211 LOG.info("Split file=" + splits.get(0)); 1212 return splits.get(0); 1213 } 1214 1215 private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) { 1216 HTableDescriptor htd = new HTableDescriptor(tableName); 1217 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); 1218 htd.addFamily(a); 1219 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b")); 1220 htd.addFamily(b); 1221 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c")); 1222 htd.addFamily(c); 1223 return htd; 1224 } 1225 1226 private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException, 1227 StreamLacksCapabilityException { 1228 fs.mkdirs(file.getParent()); 1229 ProtobufLogWriter writer = new ProtobufLogWriter(); 1230 writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); 1231 for (FSWALEntry entry : entries) { 1232 writer.append(entry); 1233 } 1234 writer.sync(false); 1235 writer.close(); 1236 } 1237 1238 protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 1239 throws IOException; 1240}