001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyInt; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.spy; 029import static org.mockito.Mockito.when; 030 031import java.io.FilterInputStream; 032import java.io.IOException; 033import java.lang.reflect.Field; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collection; 038import java.util.HashSet; 039import java.util.List; 040import java.util.NavigableMap; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicInteger; 045import java.util.function.Consumer; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtility; 055import org.apache.hadoop.hbase.HColumnDescriptor; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.HRegionInfo; 058import org.apache.hadoop.hbase.HTableDescriptor; 059import org.apache.hadoop.hbase.KeyValue; 060import org.apache.hadoop.hbase.MiniHBaseCluster; 061import org.apache.hadoop.hbase.ServerName; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.ResultScanner; 068import org.apache.hadoop.hbase.client.Scan; 069import org.apache.hadoop.hbase.client.Table; 070import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 071import org.apache.hadoop.hbase.monitoring.MonitoredTask; 072import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 073import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 074import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 075import org.apache.hadoop.hbase.regionserver.FlushRequestListener; 076import org.apache.hadoop.hbase.regionserver.FlushRequester; 077import org.apache.hadoop.hbase.regionserver.HRegion; 078import org.apache.hadoop.hbase.regionserver.HRegionServer; 079import org.apache.hadoop.hbase.regionserver.HStore; 080import org.apache.hadoop.hbase.regionserver.MemStoreSizing; 081import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 082import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 083import org.apache.hadoop.hbase.regionserver.Region; 084import org.apache.hadoop.hbase.regionserver.RegionScanner; 085import org.apache.hadoop.hbase.regionserver.RegionServerServices; 086import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 087import org.apache.hadoop.hbase.security.User; 088import org.apache.hadoop.hbase.util.Bytes; 089import org.apache.hadoop.hbase.util.CommonFSUtils; 090import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 091import org.apache.hadoop.hbase.util.EnvironmentEdge; 092import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 093import org.apache.hadoop.hbase.util.HFileTestUtil; 094import org.apache.hadoop.hbase.util.Pair; 095import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 096import org.apache.hadoop.hbase.wal.WAL; 097import org.apache.hadoop.hbase.wal.WALEdit; 098import org.apache.hadoop.hbase.wal.WALFactory; 099import org.apache.hadoop.hbase.wal.WALKeyImpl; 100import org.apache.hadoop.hbase.wal.WALSplitUtil; 101import org.apache.hadoop.hbase.wal.WALSplitter; 102import org.apache.hadoop.hdfs.DFSInputStream; 103import org.junit.After; 104import org.junit.AfterClass; 105import org.junit.Before; 106import org.junit.BeforeClass; 107import org.junit.Rule; 108import org.junit.Test; 109import org.junit.rules.TestName; 110import org.mockito.Mockito; 111import org.mockito.invocation.InvocationOnMock; 112import org.mockito.stubbing.Answer; 113import org.slf4j.Logger; 114import org.slf4j.LoggerFactory; 115 116/** 117 * Test replay of edits out of a WAL split. 118 */ 119public abstract class AbstractTestWALReplay { 120 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 121 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 122 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 123 private Path hbaseRootDir = null; 124 private String logName; 125 private Path oldLogDir; 126 private Path logDir; 127 private FileSystem fs; 128 private Configuration conf; 129 private WALFactory wals; 130 131 @Rule 132 public final TestName currentTest = new TestName(); 133 134 @BeforeClass 135 public static void setUpBeforeClass() throws Exception { 136 Configuration conf = TEST_UTIL.getConfiguration(); 137 // The below config supported by 0.20-append and CDH3b2 138 conf.setInt("dfs.client.block.recovery.retries", 2); 139 TEST_UTIL.startMiniCluster(3); 140 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 141 LOG.info("hbase.rootdir=" + hbaseRootDir); 142 CommonFSUtils.setRootDir(conf, hbaseRootDir); 143 } 144 145 @AfterClass 146 public static void tearDownAfterClass() throws Exception { 147 TEST_UTIL.shutdownMiniCluster(); 148 } 149 150 @Before 151 public void setUp() throws Exception { 152 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 153 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 154 this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf); 155 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 156 String serverName = ServerName 157 .valueOf(currentTest.getMethodName() + "-manual", 16010, ee.currentTime()).toString(); 158 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 159 this.logDir = new Path(this.hbaseRootDir, logName); 160 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 161 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 162 } 163 this.wals = new WALFactory(conf, currentTest.getMethodName()); 164 } 165 166 @After 167 public void tearDown() throws Exception { 168 this.wals.close(); 169 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 170 } 171 172 /* 173 * @param p Directory to cleanup 174 */ 175 private void deleteDir(final Path p) throws IOException { 176 if (this.fs.exists(p)) { 177 if (!this.fs.delete(p, true)) { 178 throw new IOException("Failed remove of " + p); 179 } 180 } 181 } 182 183 /** 184 * n 185 */ 186 @Test 187 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { 188 final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); 189 byte[] family1 = Bytes.toBytes("cf1"); 190 byte[] family2 = Bytes.toBytes("cf2"); 191 byte[] qualifier = Bytes.toBytes("q"); 192 byte[] value = Bytes.toBytes("testV"); 193 byte[][] familys = { family1, family2 }; 194 TEST_UTIL.createTable(tableName, familys); 195 Table htable = TEST_UTIL.getConnection().getTable(tableName); 196 Put put = new Put(Bytes.toBytes("r1")); 197 put.addColumn(family1, qualifier, value); 198 htable.put(put); 199 ResultScanner resultScanner = htable.getScanner(new Scan()); 200 int count = 0; 201 while (resultScanner.next() != null) { 202 count++; 203 } 204 resultScanner.close(); 205 assertEquals(1, count); 206 207 MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); 208 List<HRegion> regions = hbaseCluster.getRegions(tableName); 209 assertEquals(1, regions.size()); 210 211 // move region to another regionserver 212 Region destRegion = regions.get(0); 213 int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName()); 214 assertTrue("Please start more than 1 regionserver", 215 hbaseCluster.getRegionServerThreads().size() > 1); 216 int destServerNum = 0; 217 while (destServerNum == originServerNum) { 218 destServerNum++; 219 } 220 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); 221 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); 222 // move region to destination regionserver 223 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName()); 224 225 // delete the row 226 Delete del = new Delete(Bytes.toBytes("r1")); 227 htable.delete(del); 228 resultScanner = htable.getScanner(new Scan()); 229 count = 0; 230 while (resultScanner.next() != null) { 231 count++; 232 } 233 resultScanner.close(); 234 assertEquals(0, count); 235 236 // flush region and make major compaction 237 HRegion region = 238 (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); 239 region.flush(true); 240 // wait to complete major compaction 241 for (HStore store : region.getStores()) { 242 store.triggerMajorCompaction(); 243 } 244 region.compact(true); 245 246 // move region to origin regionserver 247 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName()); 248 // abort the origin regionserver 249 originServer.abort("testing"); 250 251 // see what we get 252 Result result = htable.get(new Get(Bytes.toBytes("r1"))); 253 if (result != null) { 254 assertTrue("Row is deleted, but we get" + result.toString(), 255 (result == null) || result.isEmpty()); 256 } 257 resultScanner.close(); 258 } 259 260 /** 261 * Tests for hbase-2727. n * @see 262 * <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> 263 */ 264 @Test 265 public void test2727() throws Exception { 266 // Test being able to have > 1 set of edits in the recovered.edits directory. 267 // Ensure edits are replayed properly. 268 final TableName tableName = TableName.valueOf("test2727"); 269 270 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 271 HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 272 Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 273 deleteDir(basedir); 274 275 HTableDescriptor htd = createBasic3FamilyHTD(tableName); 276 Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 277 HBaseTestingUtility.closeRegionAndWAL(region2); 278 final byte[] rowName = tableName.getName(); 279 280 WAL wal1 = createWAL(this.conf, hbaseRootDir, logName); 281 // Add 1k to each family. 282 final int countPerFamily = 1000; 283 284 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 285 for (byte[] fam : htd.getFamiliesKeys()) { 286 scopes.put(fam, 0); 287 } 288 for (HColumnDescriptor hcd : htd.getFamilies()) { 289 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1, htd, mvcc, 290 scopes); 291 } 292 wal1.shutdown(); 293 runWALSplit(this.conf); 294 295 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 296 // Add 1k to each family. 297 for (HColumnDescriptor hcd : htd.getFamilies()) { 298 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2, htd, mvcc, 299 scopes); 300 } 301 wal2.shutdown(); 302 runWALSplit(this.conf); 303 304 WAL wal3 = createWAL(this.conf, hbaseRootDir, logName); 305 try { 306 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); 307 long seqid = region.getOpenSeqNum(); 308 // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. 309 // When opened, this region would apply 6k edits, and increment the sequenceId by 1 310 assertTrue(seqid > mvcc.getWritePoint()); 311 assertEquals(seqid - 1, mvcc.getWritePoint()); 312 LOG.debug( 313 "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint()); 314 315 // TODO: Scan all. 316 region.close(); 317 } finally { 318 wal3.close(); 319 } 320 } 321 322 /** 323 * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'. 324 * nnnnn 325 */ 326 @Test 327 public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException, 328 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 329 final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); 330 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 331 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 332 deleteDir(basedir); 333 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 334 Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 335 HBaseTestingUtility.closeRegionAndWAL(region2); 336 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 337 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 338 339 byte[] family = htd.getFamilies().iterator().next().getName(); 340 Path f = new Path(basedir, "hfile"); 341 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), 342 Bytes.toBytes("z"), 10); 343 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 344 hfs.add(Pair.newPair(family, f.toString())); 345 region.bulkLoadHFiles(hfs, true, null); 346 347 // Add an edit so something in the WAL 348 byte[] row = tableName.getName(); 349 region.put((new Put(row)).addColumn(family, family, family)); 350 wal.sync(); 351 final int rowsInsertedCount = 11; 352 353 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 354 355 // Now 'crash' the region by stealing its wal 356 final Configuration newConf = HBaseConfiguration.create(this.conf); 357 User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString()); 358 user.runAs(new PrivilegedExceptionAction() { 359 @Override 360 public Object run() throws Exception { 361 runWALSplit(newConf); 362 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 363 364 HRegion region2 = 365 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 366 long seqid2 = region2.getOpenSeqNum(); 367 assertTrue(seqid2 > -1); 368 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 369 370 // I can't close wal1. Its been appropriated when we split. 371 region2.close(); 372 wal2.close(); 373 return null; 374 } 375 }); 376 } 377 378 /** 379 * HRegion test case that is made of a major compacted HFile (created with three bulk loaded 380 * files) and an edit in the memstore. This is for HBASE-10958 "[dataloss] Bulk loading with 381 * seqids can prevent some log entries from being replayed" nnnnn 382 */ 383 @Test 384 public void testCompactedBulkLoadedFiles() throws IOException, SecurityException, 385 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 386 final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles"); 387 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 388 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 389 deleteDir(basedir); 390 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 391 HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 392 HBaseTestingUtility.closeRegionAndWAL(region2); 393 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 394 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 395 396 // Add an edit so something in the WAL 397 byte[] row = tableName.getName(); 398 byte[] family = htd.getFamilies().iterator().next().getName(); 399 region.put((new Put(row)).addColumn(family, family, family)); 400 wal.sync(); 401 402 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 403 for (int i = 0; i < 3; i++) { 404 Path f = new Path(basedir, "hfile" + i); 405 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), 406 Bytes.toBytes(i + "50"), 10); 407 hfs.add(Pair.newPair(family, f.toString())); 408 } 409 region.bulkLoadHFiles(hfs, true, null); 410 final int rowsInsertedCount = 31; 411 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 412 413 // major compact to turn all the bulk loaded files into one normal file 414 region.compact(true); 415 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 416 417 // Now 'crash' the region by stealing its wal 418 final Configuration newConf = HBaseConfiguration.create(this.conf); 419 User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString()); 420 user.runAs(new PrivilegedExceptionAction() { 421 @Override 422 public Object run() throws Exception { 423 runWALSplit(newConf); 424 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 425 426 HRegion region2 = 427 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 428 long seqid2 = region2.getOpenSeqNum(); 429 assertTrue(seqid2 > -1); 430 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 431 432 // I can't close wal1. Its been appropriated when we split. 433 region2.close(); 434 wal2.close(); 435 return null; 436 } 437 }); 438 } 439 440 /** 441 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 442 * seqids. nnnnn 443 */ 444 @Test 445 public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException, 446 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 447 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 448 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 449 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 450 deleteDir(basedir); 451 final byte[] rowName = tableName.getName(); 452 final int countPerFamily = 10; 453 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 454 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 455 HBaseTestingUtility.closeRegionAndWAL(region3); 456 // Write countPerFamily edits into the three families. Do a flush on one 457 // of the families during the load of edits so its seqid is not same as 458 // others to test we do right thing when different seqids. 459 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 460 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 461 long seqid = region.getOpenSeqNum(); 462 boolean first = true; 463 for (HColumnDescriptor hcd : htd.getFamilies()) { 464 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 465 if (first) { 466 // If first, so we have at least one family w/ different seqid to rest. 467 region.flush(true); 468 first = false; 469 } 470 } 471 // Now assert edits made it in. 472 final Get g = new Get(rowName); 473 Result result = region.get(g); 474 assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); 475 // Now close the region (without flush), split the log, reopen the region and assert that 476 // replay of log has the correct effect, that our seqids are calculated correctly so 477 // all edits in logs are seen as 'stale'/old. 478 region.close(true); 479 wal.shutdown(); 480 runWALSplit(this.conf); 481 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 482 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); 483 long seqid2 = region2.getOpenSeqNum(); 484 assertTrue(seqid + result.size() < seqid2); 485 final Result result1b = region2.get(g); 486 assertEquals(result.size(), result1b.size()); 487 488 // Next test. Add more edits, then 'crash' this region by stealing its wal 489 // out from under it and assert that replay of the log adds the edits back 490 // correctly when region is opened again. 491 for (HColumnDescriptor hcd : htd.getFamilies()) { 492 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); 493 } 494 // Get count of edits. 495 final Result result2 = region2.get(g); 496 assertEquals(2 * result.size(), result2.size()); 497 wal2.sync(); 498 final Configuration newConf = HBaseConfiguration.create(this.conf); 499 User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString()); 500 user.runAs(new PrivilegedExceptionAction<Object>() { 501 @Override 502 public Object run() throws Exception { 503 runWALSplit(newConf); 504 FileSystem newFS = FileSystem.get(newConf); 505 // Make a new wal for new region open. 506 WAL wal3 = createWAL(newConf, hbaseRootDir, logName); 507 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); 508 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { 509 @Override 510 protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) { 511 super.restoreEdit(s, cell, memstoreSizing); 512 countOfRestoredEdits.incrementAndGet(); 513 } 514 }; 515 long seqid3 = region3.initialize(); 516 Result result3 = region3.get(g); 517 // Assert that count of cells is same as before crash. 518 assertEquals(result2.size(), result3.size()); 519 assertEquals(htd.getFamilies().size() * countPerFamily, countOfRestoredEdits.get()); 520 521 // I can't close wal1. Its been appropriated when we split. 522 region3.close(); 523 wal3.close(); 524 return null; 525 } 526 }); 527 } 528 529 /** 530 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 531 * got flushed but others did not. Unfortunately, there is no easy hook to flush at a store level. 532 * The way we get around this is by flushing at the region level, and then deleting the recently 533 * flushed store file for one of the Stores. This would put us back in the situation where all but 534 * that store got flushed and the region died. We restart Region again, and verify that the edits 535 * were replayed. nnnnn 536 */ 537 @Test 538 public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException, 539 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 540 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 541 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 542 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 543 deleteDir(basedir); 544 final byte[] rowName = tableName.getName(); 545 final int countPerFamily = 10; 546 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 547 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 548 HBaseTestingUtility.closeRegionAndWAL(region3); 549 // Write countPerFamily edits into the three families. Do a flush on one 550 // of the families during the load of edits so its seqid is not same as 551 // others to test we do right thing when different seqids. 552 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 553 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 554 long seqid = region.getOpenSeqNum(); 555 for (HColumnDescriptor hcd : htd.getFamilies()) { 556 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 557 } 558 559 // Now assert edits made it in. 560 final Get g = new Get(rowName); 561 Result result = region.get(g); 562 assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); 563 564 // Let us flush the region 565 region.flush(true); 566 region.close(true); 567 wal.shutdown(); 568 569 // delete the store files in the second column family to simulate a failure 570 // in between the flushcache(); 571 // we have 3 families. killing the middle one ensures that taking the maximum 572 // will make us fail. 573 int cf_count = 0; 574 for (HColumnDescriptor hcd : htd.getFamilies()) { 575 cf_count++; 576 if (cf_count == 2) { 577 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); 578 } 579 } 580 581 // Let us try to split and recover 582 runWALSplit(this.conf); 583 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 584 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); 585 long seqid2 = region2.getOpenSeqNum(); 586 assertTrue(seqid + result.size() < seqid2); 587 588 final Result result1b = region2.get(g); 589 assertEquals(result.size(), result1b.size()); 590 } 591 592 // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. 593 // Only throws exception if throwExceptionWhenFlushing is set true. 594 public static class CustomStoreFlusher extends DefaultStoreFlusher { 595 // Switch between throw and not throw exception in flush 596 public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); 597 598 public CustomStoreFlusher(Configuration conf, HStore store) { 599 super(conf, store); 600 } 601 602 @Override 603 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 604 MonitoredTask status, ThroughputController throughputController, 605 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 606 if (throwExceptionWhenFlushing.get()) { 607 throw new IOException("Simulated exception by tests"); 608 } 609 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 610 writerCreationTracker); 611 } 612 }; 613 614 /** 615 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 616 * flush after writing some data, then writing more data and flush again, at last verify the data. 617 * n 618 */ 619 @Test 620 public void testReplayEditsAfterAbortingFlush() throws IOException { 621 final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush"); 622 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 623 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 624 deleteDir(basedir); 625 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 626 HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 627 HBaseTestingUtility.closeRegionAndWAL(region3); 628 // Write countPerFamily edits into the three families. Do a flush on one 629 // of the families during the load of edits so its seqid is not same as 630 // others to test we do right thing when different seqids. 631 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 632 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 633 Mockito.doReturn(false).when(rsServices).isAborted(); 634 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 635 when(rsServices.getConfiguration()).thenReturn(conf); 636 Configuration customConf = new Configuration(this.conf); 637 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 638 CustomStoreFlusher.class.getName()); 639 HRegion region = 640 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); 641 int writtenRowCount = 10; 642 List<HColumnDescriptor> families = new ArrayList<>(htd.getFamilies()); 643 for (int i = 0; i < writtenRowCount; i++) { 644 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 645 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 646 Bytes.toBytes("val")); 647 region.put(put); 648 } 649 650 // Now assert edits made it in. 651 RegionScanner scanner = region.getScanner(new Scan()); 652 assertEquals(writtenRowCount, getScannedCount(scanner)); 653 654 // Let us flush the region 655 CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 656 try { 657 region.flush(true); 658 fail("Injected exception hasn't been thrown"); 659 } catch (IOException e) { 660 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 661 // simulated to abort server 662 Mockito.doReturn(true).when(rsServices).isAborted(); 663 region.setClosing(false); // region normally does not accept writes after 664 // DroppedSnapshotException. We mock around it for this test. 665 } 666 // writing more data 667 int moreRow = 10; 668 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 669 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 670 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 671 Bytes.toBytes("val")); 672 region.put(put); 673 } 674 writtenRowCount += moreRow; 675 // call flush again 676 CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 677 try { 678 region.flush(true); 679 } catch (IOException t) { 680 LOG.info( 681 "Expected exception when flushing region because server is stopped," + t.getMessage()); 682 } 683 684 region.close(true); 685 wal.shutdown(); 686 687 // Let us try to split and recover 688 runWALSplit(this.conf); 689 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 690 Mockito.doReturn(false).when(rsServices).isAborted(); 691 HRegion region2 = 692 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); 693 scanner = region2.getScanner(new Scan()); 694 assertEquals(writtenRowCount, getScannedCount(scanner)); 695 } 696 697 private int getScannedCount(RegionScanner scanner) throws IOException { 698 int scannedCount = 0; 699 List<Cell> results = new ArrayList<>(); 700 while (true) { 701 boolean existMore = scanner.next(results); 702 if (!results.isEmpty()) scannedCount++; 703 if (!existMore) break; 704 results.clear(); 705 } 706 return scannedCount; 707 } 708 709 /** 710 * Create an HRegion with the result of a WAL split and test we only see the good edits n 711 */ 712 @Test 713 public void testReplayEditsWrittenIntoWAL() throws Exception { 714 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 715 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 716 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 717 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 718 deleteDir(basedir); 719 720 final HTableDescriptor htd = createBasic3FamilyHTD(tableName); 721 HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 722 HBaseTestingUtility.closeRegionAndWAL(region2); 723 final WAL wal = createWAL(this.conf, hbaseRootDir, logName); 724 final byte[] rowName = tableName.getName(); 725 final byte[] regionName = hri.getEncodedNameAsBytes(); 726 727 // Add 1k to each family. 728 final int countPerFamily = 1000; 729 Set<byte[]> familyNames = new HashSet<>(); 730 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 731 for (byte[] fam : htd.getFamiliesKeys()) { 732 scopes.put(fam, 0); 733 } 734 for (HColumnDescriptor hcd : htd.getFamilies()) { 735 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, htd, mvcc, 736 scopes); 737 familyNames.add(hcd.getName()); 738 } 739 740 // Add a cache flush, shouldn't have any effect 741 wal.startCacheFlush(regionName, familyNames); 742 wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM); 743 744 // Add an edit to another family, should be skipped. 745 WALEdit edit = new WALEdit(); 746 long now = ee.currentTime(); 747 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); 748 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 749 edit); 750 751 // Delete the c family to verify deletes make it over. 752 edit = new WALEdit(); 753 now = ee.currentTime(); 754 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); 755 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 756 edit); 757 758 // Sync. 759 wal.sync(); 760 // Make a new conf and a new fs for the splitter to run on so we can take 761 // over old wal. 762 final Configuration newConf = HBaseConfiguration.create(this.conf); 763 User user = HBaseTestingUtility.getDifferentUser(newConf, ".replay.wal.secondtime"); 764 user.runAs(new PrivilegedExceptionAction<Void>() { 765 @Override 766 public Void run() throws Exception { 767 runWALSplit(newConf); 768 FileSystem newFS = FileSystem.get(newConf); 769 // 100k seems to make for about 4 flushes during HRegion#initialize. 770 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); 771 // Make a new wal for new region. 772 WAL newWal = createWAL(newConf, hbaseRootDir, logName); 773 final AtomicInteger flushcount = new AtomicInteger(0); 774 try { 775 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { 776 @Override 777 protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, 778 final Collection<HStore> storesToFlush, MonitoredTask status, 779 boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { 780 LOG.info("InternalFlushCache Invoked"); 781 FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, 782 Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); 783 flushcount.incrementAndGet(); 784 return fs; 785 } 786 }; 787 // The seq id this region has opened up with 788 long seqid = region.initialize(); 789 790 // The mvcc readpoint of from inserting data. 791 long writePoint = mvcc.getWritePoint(); 792 793 // We flushed during init. 794 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); 795 assertTrue((seqid - 1) == writePoint); 796 797 Get get = new Get(rowName); 798 Result result = region.get(get); 799 // Make sure we only see the good edits 800 assertEquals(countPerFamily * (htd.getFamilies().size() - 1), result.size()); 801 region.close(); 802 } finally { 803 newWal.close(); 804 } 805 return null; 806 } 807 }); 808 } 809 810 @Test 811 // the following test is for HBASE-6065 812 public void testSequentialEditLogSeqNum() throws IOException { 813 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 814 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 815 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 816 deleteDir(basedir); 817 final byte[] rowName = tableName.getName(); 818 final int countPerFamily = 10; 819 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 820 821 // Mock the WAL 822 MockWAL wal = createMockWAL(); 823 824 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 825 for (HColumnDescriptor hcd : htd.getFamilies()) { 826 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 827 } 828 829 // Let us flush the region 830 // But this time completeflushcache is not yet done 831 region.flush(true); 832 for (HColumnDescriptor hcd : htd.getFamilies()) { 833 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); 834 } 835 long lastestSeqNumber = region.getReadPoint(null); 836 // get the current seq no 837 wal.doCompleteCacheFlush = true; 838 // allow complete cache flush with the previous seq number got after first 839 // set of edits. 840 wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 841 wal.shutdown(); 842 FileStatus[] listStatus = wal.getFiles(); 843 assertNotNull(listStatus); 844 assertTrue(listStatus.length > 0); 845 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, 846 wals, null); 847 FileStatus[] listStatus1 = 848 this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName), 849 new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { 850 @Override 851 public boolean accept(Path p) { 852 return !WALSplitUtil.isSequenceIdFile(p); 853 } 854 }); 855 int editCount = 0; 856 for (FileStatus fileStatus : listStatus1) { 857 editCount = Integer.parseInt(fileStatus.getPath().getName()); 858 } 859 // The sequence number should be same 860 assertEquals( 861 "The sequence number of the recoverd.edits and the current edit seq should be same", 862 lastestSeqNumber, editCount); 863 } 864 865 /** 866 * testcase for https://issues.apache.org/jira/browse/HBASE-15252 867 */ 868 @Test 869 public void testDatalossWhenInputError() throws Exception { 870 final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); 871 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 872 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 873 deleteDir(basedir); 874 final byte[] rowName = tableName.getName(); 875 final int countPerFamily = 10; 876 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 877 HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 878 Path regionDir = region1.getWALRegionDir(); 879 HBaseTestingUtility.closeRegionAndWAL(region1); 880 881 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 882 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 883 for (HColumnDescriptor hcd : htd.getFamilies()) { 884 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 885 } 886 // Now assert edits made it in. 887 final Get g = new Get(rowName); 888 Result result = region.get(g); 889 assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); 890 // Now close the region (without flush), split the log, reopen the region and assert that 891 // replay of log has the correct effect. 892 region.close(true); 893 wal.shutdown(); 894 895 runWALSplit(this.conf); 896 897 // here we let the DFSInputStream throw an IOException just after the WALHeader. 898 Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first(); 899 FSDataInputStream stream = fs.open(editFile); 900 stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); 901 Class<? extends AbstractFSWALProvider.Reader> logReaderClass = 902 conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 903 AbstractFSWALProvider.Reader.class); 904 AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance(); 905 reader.init(this.fs, editFile, conf, stream); 906 final long headerLength = stream.getPos(); 907 reader.close(); 908 FileSystem spyFs = spy(this.fs); 909 doAnswer(new Answer<FSDataInputStream>() { 910 911 @Override 912 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 913 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); 914 Field field = FilterInputStream.class.getDeclaredField("in"); 915 field.setAccessible(true); 916 final DFSInputStream in = (DFSInputStream) field.get(stream); 917 DFSInputStream spyIn = spy(in); 918 doAnswer(new Answer<Integer>() { 919 920 private long pos; 921 922 @Override 923 public Integer answer(InvocationOnMock invocation) throws Throwable { 924 if (pos >= headerLength) { 925 throw new IOException("read over limit"); 926 } 927 int b = (Integer) invocation.callRealMethod(); 928 if (b > 0) { 929 pos += b; 930 } 931 return b; 932 } 933 }).when(spyIn).read(any(byte[].class), anyInt(), anyInt()); 934 doAnswer(new Answer<Void>() { 935 936 @Override 937 public Void answer(InvocationOnMock invocation) throws Throwable { 938 invocation.callRealMethod(); 939 in.close(); 940 return null; 941 } 942 }).when(spyIn).close(); 943 field.set(stream, spyIn); 944 return stream; 945 } 946 }).when(spyFs).open(eq(editFile)); 947 948 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 949 HRegion region2; 950 try { 951 // log replay should fail due to the IOException, otherwise we may lose data. 952 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); 953 assertEquals(result.size(), region2.get(g).size()); 954 } catch (IOException e) { 955 assertEquals("read over limit", e.getMessage()); 956 } 957 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); 958 assertEquals(result.size(), region2.get(g).size()); 959 } 960 961 /** 962 * testcase for https://issues.apache.org/jira/browse/HBASE-14949. 963 */ 964 private void testNameConflictWhenSplit(boolean largeFirst) 965 throws IOException, StreamLacksCapabilityException { 966 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 967 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 968 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 969 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 970 deleteDir(basedir); 971 972 final HTableDescriptor htd = createBasic1FamilyHTD(tableName); 973 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 974 for (byte[] fam : htd.getFamiliesKeys()) { 975 scopes.put(fam, 0); 976 } 977 HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 978 HBaseTestingUtility.closeRegionAndWAL(region); 979 final byte[] family = htd.getColumnFamilies()[0].getName(); 980 final byte[] rowName = tableName.getName(); 981 FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); 982 FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); 983 984 Path largeFile = new Path(logDir, "wal-1"); 985 Path smallFile = new Path(logDir, "wal-2"); 986 writerWALFile(largeFile, Arrays.asList(entry1, entry2)); 987 writerWALFile(smallFile, Arrays.asList(entry2)); 988 FileStatus first, second; 989 if (largeFirst) { 990 first = fs.getFileStatus(largeFile); 991 second = fs.getFileStatus(smallFile); 992 } else { 993 first = fs.getFileStatus(smallFile); 994 second = fs.getFileStatus(largeFile); 995 } 996 WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null); 997 WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null); 998 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 999 region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); 1000 assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); 1001 assertEquals(2, region.get(new Get(rowName)).size()); 1002 } 1003 1004 @Test 1005 public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { 1006 testNameConflictWhenSplit(true); 1007 } 1008 1009 @Test 1010 public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { 1011 testNameConflictWhenSplit(false); 1012 } 1013 1014 static class MockWAL extends FSHLog { 1015 boolean doCompleteCacheFlush = false; 1016 1017 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) 1018 throws IOException { 1019 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 1020 } 1021 1022 @Override 1023 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 1024 if (!doCompleteCacheFlush) { 1025 return; 1026 } 1027 super.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 1028 } 1029 } 1030 1031 private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) { 1032 HTableDescriptor htd = new HTableDescriptor(tableName); 1033 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); 1034 htd.addFamily(a); 1035 return htd; 1036 } 1037 1038 private MockWAL createMockWAL() throws IOException { 1039 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); 1040 wal.init(); 1041 // Set down maximum recovery so we dfsclient doesn't linger retrying something 1042 // long gone. 1043 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); 1044 return wal; 1045 } 1046 1047 // Flusher used in this test. Keep count of how often we are called and 1048 // actually run the flush inside here. 1049 static class TestFlusher implements FlushRequester { 1050 private HRegion r; 1051 1052 @Override 1053 public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) { 1054 try { 1055 r.flush(false); 1056 return true; 1057 } catch (IOException e) { 1058 throw new RuntimeException("Exception flushing", e); 1059 } 1060 } 1061 1062 @Override 1063 public boolean requestFlush(HRegion region, List<byte[]> families, 1064 FlushLifeCycleTracker tracker) { 1065 return true; 1066 } 1067 1068 @Override 1069 public boolean requestDelayedFlush(HRegion region, long when) { 1070 return true; 1071 } 1072 1073 @Override 1074 public void registerFlushRequestListener(FlushRequestListener listener) { 1075 1076 } 1077 1078 @Override 1079 public boolean unregisterFlushRequestListener(FlushRequestListener listener) { 1080 return false; 1081 } 1082 1083 @Override 1084 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 1085 1086 } 1087 } 1088 1089 private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri, 1090 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { 1091 return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); 1092 } 1093 1094 private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, 1095 int index) { 1096 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); 1097 byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); 1098 WALEdit edit = new WALEdit(); 1099 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 1100 return edit; 1101 } 1102 1103 private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, 1104 byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, 1105 int index, NavigableMap<byte[], Integer> scopes) throws IOException { 1106 FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), 1107 createWALEdit(rowName, family, ee, index), hri, true, null); 1108 entry.stampRegionSequenceId(mvcc.begin()); 1109 return entry; 1110 } 1111 1112 private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, 1113 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 1114 final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc, 1115 NavigableMap<byte[], Integer> scopes) throws IOException { 1116 for (int j = 0; j < count; j++) { 1117 wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), 1118 createWALEdit(rowName, family, ee, j)); 1119 } 1120 wal.sync(); 1121 } 1122 1123 public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, 1124 EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { 1125 List<Put> puts = new ArrayList<>(); 1126 for (int j = 0; j < count; j++) { 1127 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); 1128 Put p = new Put(rowName); 1129 p.addColumn(family, qualifier, ee.currentTime(), rowName); 1130 r.put(p); 1131 puts.add(p); 1132 } 1133 return puts; 1134 } 1135 1136 /* 1137 * Creates an HRI around an HTD that has <code>tableName</code> and three column families named 1138 * 'a','b', and 'c'. 1139 * @param tableName Name of table to use when we create HTableDescriptor. 1140 */ 1141 private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { 1142 return new HRegionInfo(tableName, null, null, false); 1143 } 1144 1145 /* 1146 * Run the split. Verify only single split file made. n * @return The single split file made n 1147 */ 1148 private Path runWALSplit(final Configuration c) throws IOException { 1149 List<Path> splits = 1150 WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 1151 // Split should generate only 1 file since there's only 1 region 1152 assertEquals("splits=" + splits, 1, splits.size()); 1153 // Make sure the file exists 1154 assertTrue(fs.exists(splits.get(0))); 1155 LOG.info("Split file=" + splits.get(0)); 1156 return splits.get(0); 1157 } 1158 1159 private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) { 1160 HTableDescriptor htd = new HTableDescriptor(tableName); 1161 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); 1162 htd.addFamily(a); 1163 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b")); 1164 htd.addFamily(b); 1165 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c")); 1166 htd.addFamily(c); 1167 return htd; 1168 } 1169 1170 private void writerWALFile(Path file, List<FSWALEntry> entries) 1171 throws IOException, StreamLacksCapabilityException { 1172 fs.mkdirs(file.getParent()); 1173 ProtobufLogWriter writer = new ProtobufLogWriter(); 1174 writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file), 1175 StreamSlowMonitor.create(conf, "testMonitor")); 1176 for (FSWALEntry entry : entries) { 1177 writer.append(entry); 1178 } 1179 writer.sync(false); 1180 writer.close(); 1181 } 1182 1183 protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 1184 throws IOException; 1185}