001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyInt; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.spy; 029import static org.mockito.Mockito.when; 030 031import java.io.FilterInputStream; 032import java.io.IOException; 033import java.lang.reflect.Field; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collection; 038import java.util.HashSet; 039import java.util.List; 040import java.util.NavigableMap; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicInteger; 045import java.util.function.Consumer; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtil; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 062import org.apache.hadoop.hbase.client.Delete; 063import org.apache.hadoop.hbase.client.Get; 064import org.apache.hadoop.hbase.client.Put; 065import org.apache.hadoop.hbase.client.RegionInfo; 066import org.apache.hadoop.hbase.client.RegionInfoBuilder; 067import org.apache.hadoop.hbase.client.Result; 068import org.apache.hadoop.hbase.client.ResultScanner; 069import org.apache.hadoop.hbase.client.Scan; 070import org.apache.hadoop.hbase.client.Table; 071import org.apache.hadoop.hbase.client.TableDescriptor; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 074import org.apache.hadoop.hbase.monitoring.MonitoredTask; 075import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 076import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 077import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 078import org.apache.hadoop.hbase.regionserver.FlushRequestListener; 079import org.apache.hadoop.hbase.regionserver.FlushRequester; 080import org.apache.hadoop.hbase.regionserver.HRegion; 081import org.apache.hadoop.hbase.regionserver.HRegionServer; 082import org.apache.hadoop.hbase.regionserver.HStore; 083import org.apache.hadoop.hbase.regionserver.MemStoreSizing; 084import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 085import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 086import org.apache.hadoop.hbase.regionserver.Region; 087import org.apache.hadoop.hbase.regionserver.RegionScanner; 088import org.apache.hadoop.hbase.regionserver.RegionServerServices; 089import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 090import org.apache.hadoop.hbase.security.User; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.CommonFSUtils; 093import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 094import org.apache.hadoop.hbase.util.EnvironmentEdge; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.HFileTestUtil; 097import org.apache.hadoop.hbase.util.Pair; 098import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 099import org.apache.hadoop.hbase.wal.WAL; 100import org.apache.hadoop.hbase.wal.WALEdit; 101import org.apache.hadoop.hbase.wal.WALFactory; 102import org.apache.hadoop.hbase.wal.WALKeyImpl; 103import org.apache.hadoop.hbase.wal.WALSplitUtil; 104import org.apache.hadoop.hbase.wal.WALSplitter; 105import org.apache.hadoop.hdfs.DFSInputStream; 106import org.junit.After; 107import org.junit.AfterClass; 108import org.junit.Before; 109import org.junit.BeforeClass; 110import org.junit.Rule; 111import org.junit.Test; 112import org.junit.rules.TestName; 113import org.mockito.Mockito; 114import org.mockito.invocation.InvocationOnMock; 115import org.mockito.stubbing.Answer; 116import org.slf4j.Logger; 117import org.slf4j.LoggerFactory; 118 119/** 120 * Test replay of edits out of a WAL split. 121 */ 122public abstract class AbstractTestWALReplay { 123 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 124 static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 125 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 126 private Path hbaseRootDir = null; 127 private String logName; 128 private Path oldLogDir; 129 private Path logDir; 130 private FileSystem fs; 131 private Configuration conf; 132 private WALFactory wals; 133 134 @Rule 135 public final TestName currentTest = new TestName(); 136 137 @BeforeClass 138 public static void setUpBeforeClass() throws Exception { 139 Configuration conf = TEST_UTIL.getConfiguration(); 140 // The below config supported by 0.20-append and CDH3b2 141 conf.setInt("dfs.client.block.recovery.retries", 2); 142 TEST_UTIL.startMiniCluster(3); 143 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 144 LOG.info("hbase.rootdir=" + hbaseRootDir); 145 CommonFSUtils.setRootDir(conf, hbaseRootDir); 146 } 147 148 @AfterClass 149 public static void tearDownAfterClass() throws Exception { 150 TEST_UTIL.shutdownMiniCluster(); 151 } 152 153 @Before 154 public void setUp() throws Exception { 155 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 156 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 157 this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf); 158 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 159 String serverName = ServerName 160 .valueOf(currentTest.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 161 .toString(); 162 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 163 this.logDir = new Path(this.hbaseRootDir, logName); 164 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 165 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 166 } 167 this.wals = new WALFactory(conf, currentTest.getMethodName()); 168 } 169 170 @After 171 public void tearDown() throws Exception { 172 this.wals.close(); 173 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 174 } 175 176 /* 177 * @param p Directory to cleanup 178 */ 179 private void deleteDir(final Path p) throws IOException { 180 if (this.fs.exists(p)) { 181 if (!this.fs.delete(p, true)) { 182 throw new IOException("Failed remove of " + p); 183 } 184 } 185 } 186 187 /** 188 * */ 189 @Test 190 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { 191 final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); 192 byte[] family1 = Bytes.toBytes("cf1"); 193 byte[] family2 = Bytes.toBytes("cf2"); 194 byte[] qualifier = Bytes.toBytes("q"); 195 byte[] value = Bytes.toBytes("testV"); 196 byte[][] familys = { family1, family2 }; 197 TEST_UTIL.createTable(tableName, familys); 198 Table htable = TEST_UTIL.getConnection().getTable(tableName); 199 Put put = new Put(Bytes.toBytes("r1")); 200 put.addColumn(family1, qualifier, value); 201 htable.put(put); 202 ResultScanner resultScanner = htable.getScanner(new Scan()); 203 int count = 0; 204 while (resultScanner.next() != null) { 205 count++; 206 } 207 resultScanner.close(); 208 assertEquals(1, count); 209 210 SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); 211 List<HRegion> regions = hbaseCluster.getRegions(tableName); 212 assertEquals(1, regions.size()); 213 214 // move region to another regionserver 215 Region destRegion = regions.get(0); 216 int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName()); 217 assertTrue("Please start more than 1 regionserver", 218 hbaseCluster.getRegionServerThreads().size() > 1); 219 int destServerNum = 0; 220 while (destServerNum == originServerNum) { 221 destServerNum++; 222 } 223 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); 224 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); 225 // move region to destination regionserver 226 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName()); 227 228 // delete the row 229 Delete del = new Delete(Bytes.toBytes("r1")); 230 htable.delete(del); 231 resultScanner = htable.getScanner(new Scan()); 232 count = 0; 233 while (resultScanner.next() != null) { 234 count++; 235 } 236 resultScanner.close(); 237 assertEquals(0, count); 238 239 // flush region and make major compaction 240 HRegion region = 241 (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); 242 region.flush(true); 243 // wait to complete major compaction 244 for (HStore store : region.getStores()) { 245 store.triggerMajorCompaction(); 246 } 247 region.compact(true); 248 249 // move region to origin regionserver 250 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName()); 251 // abort the origin regionserver 252 originServer.abort("testing"); 253 254 // see what we get 255 Result result = htable.get(new Get(Bytes.toBytes("r1"))); 256 if (result != null) { 257 assertTrue("Row is deleted, but we get" + result.toString(), 258 (result == null) || result.isEmpty()); 259 } 260 resultScanner.close(); 261 } 262 263 /** 264 * Tests for hbase-2727. 265 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> 266 */ 267 @Test 268 public void test2727() throws Exception { 269 // Test being able to have > 1 set of edits in the recovered.edits directory. 270 // Ensure edits are replayed properly. 271 final TableName tableName = TableName.valueOf("test2727"); 272 273 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 274 RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 275 Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 276 deleteDir(basedir); 277 278 TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName); 279 Region region2 = 280 HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor); 281 HBaseTestingUtil.closeRegionAndWAL(region2); 282 final byte[] rowName = tableName.getName(); 283 284 WAL wal1 = createWAL(this.conf, hbaseRootDir, logName); 285 // Add 1k to each family. 286 final int countPerFamily = 1000; 287 288 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 289 for (byte[] fam : tableDescriptor.getColumnFamilyNames()) { 290 scopes.put(fam, 0); 291 } 292 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 293 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1, 294 mvcc, scopes); 295 } 296 wal1.shutdown(); 297 runWALSplit(this.conf); 298 299 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 300 // Add 1k to each family. 301 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 302 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2, 303 mvcc, scopes); 304 } 305 wal2.shutdown(); 306 runWALSplit(this.conf); 307 308 WAL wal3 = createWAL(this.conf, hbaseRootDir, logName); 309 try { 310 HRegion region = 311 HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3); 312 long seqid = region.getOpenSeqNum(); 313 // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. 314 // When opened, this region would apply 6k edits, and increment the sequenceId by 1 315 assertTrue(seqid > mvcc.getWritePoint()); 316 assertEquals(seqid - 1, mvcc.getWritePoint()); 317 LOG.debug( 318 "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint()); 319 320 // TODO: Scan all. 321 region.close(); 322 } finally { 323 wal3.close(); 324 } 325 } 326 327 /** 328 * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'. 329 */ 330 @Test 331 public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException, 332 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 333 final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); 334 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 335 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 336 deleteDir(basedir); 337 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 338 Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 339 HBaseTestingUtil.closeRegionAndWAL(region2); 340 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 341 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 342 343 byte[] family = htd.getColumnFamilies()[0].getName(); 344 Path f = new Path(basedir, "hfile"); 345 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), 346 Bytes.toBytes("z"), 10); 347 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 348 hfs.add(Pair.newPair(family, f.toString())); 349 region.bulkLoadHFiles(hfs, true, null); 350 351 // Add an edit so something in the WAL 352 byte[] row = tableName.getName(); 353 region.put((new Put(row)).addColumn(family, family, family)); 354 wal.sync(); 355 final int rowsInsertedCount = 11; 356 357 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 358 359 // Now 'crash' the region by stealing its wal 360 final Configuration newConf = HBaseConfiguration.create(this.conf); 361 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 362 user.runAs(new PrivilegedExceptionAction() { 363 @Override 364 public Object run() throws Exception { 365 runWALSplit(newConf); 366 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 367 368 HRegion region2 = 369 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 370 long seqid2 = region2.getOpenSeqNum(); 371 assertTrue(seqid2 > -1); 372 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 373 374 // I can't close wal1. Its been appropriated when we split. 375 region2.close(); 376 wal2.close(); 377 return null; 378 } 379 }); 380 } 381 382 /** 383 * HRegion test case that is made of a major compacted HFile (created with three bulk loaded 384 * files) and an edit in the memstore. 385 * <p/> 386 * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from 387 * being replayed" 388 */ 389 @Test 390 public void testCompactedBulkLoadedFiles() throws IOException, SecurityException, 391 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 392 final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles"); 393 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 394 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 395 deleteDir(basedir); 396 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 397 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 398 HBaseTestingUtil.closeRegionAndWAL(region2); 399 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 400 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 401 402 // Add an edit so something in the WAL 403 byte[] row = tableName.getName(); 404 byte[] family = htd.getColumnFamilies()[0].getName(); 405 region.put((new Put(row)).addColumn(family, family, family)); 406 wal.sync(); 407 408 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 409 for (int i = 0; i < 3; i++) { 410 Path f = new Path(basedir, "hfile" + i); 411 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), 412 Bytes.toBytes(i + "50"), 10); 413 hfs.add(Pair.newPair(family, f.toString())); 414 } 415 region.bulkLoadHFiles(hfs, true, null); 416 final int rowsInsertedCount = 31; 417 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 418 419 // major compact to turn all the bulk loaded files into one normal file 420 region.compact(true); 421 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 422 423 // Now 'crash' the region by stealing its wal 424 final Configuration newConf = HBaseConfiguration.create(this.conf); 425 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 426 user.runAs(new PrivilegedExceptionAction() { 427 @Override 428 public Object run() throws Exception { 429 runWALSplit(newConf); 430 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 431 432 HRegion region2 = 433 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 434 long seqid2 = region2.getOpenSeqNum(); 435 assertTrue(seqid2 > -1); 436 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 437 438 // I can't close wal1. Its been appropriated when we split. 439 region2.close(); 440 wal2.close(); 441 return null; 442 } 443 }); 444 } 445 446 /** 447 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 448 * seqids. 449 */ 450 @Test 451 public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException, 452 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 453 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 454 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 455 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 456 deleteDir(basedir); 457 final byte[] rowName = tableName.getName(); 458 final int countPerFamily = 10; 459 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 460 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 461 HBaseTestingUtil.closeRegionAndWAL(region3); 462 // Write countPerFamily edits into the three families. Do a flush on one 463 // of the families during the load of edits so its seqid is not same as 464 // others to test we do right thing when different seqids. 465 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 466 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 467 long seqid = region.getOpenSeqNum(); 468 boolean first = true; 469 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 470 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 471 if (first) { 472 // If first, so we have at least one family w/ different seqid to rest. 473 region.flush(true); 474 first = false; 475 } 476 } 477 // Now assert edits made it in. 478 final Get g = new Get(rowName); 479 Result result = region.get(g); 480 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 481 // Now close the region (without flush), split the log, reopen the region and assert that 482 // replay of log has the correct effect, that our seqids are calculated correctly so 483 // all edits in logs are seen as 'stale'/old. 484 region.close(true); 485 wal.shutdown(); 486 runWALSplit(this.conf); 487 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 488 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); 489 long seqid2 = region2.getOpenSeqNum(); 490 assertTrue(seqid + result.size() < seqid2); 491 final Result result1b = region2.get(g); 492 assertEquals(result.size(), result1b.size()); 493 494 // Next test. Add more edits, then 'crash' this region by stealing its wal 495 // out from under it and assert that replay of the log adds the edits back 496 // correctly when region is opened again. 497 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 498 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); 499 } 500 // Get count of edits. 501 final Result result2 = region2.get(g); 502 assertEquals(2 * result.size(), result2.size()); 503 wal2.sync(); 504 final Configuration newConf = HBaseConfiguration.create(this.conf); 505 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 506 user.runAs(new PrivilegedExceptionAction<Object>() { 507 @Override 508 public Object run() throws Exception { 509 runWALSplit(newConf); 510 FileSystem newFS = FileSystem.get(newConf); 511 // Make a new wal for new region open. 512 WAL wal3 = createWAL(newConf, hbaseRootDir, logName); 513 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); 514 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { 515 @Override 516 protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) { 517 super.restoreEdit(s, cell, memstoreSizing); 518 countOfRestoredEdits.incrementAndGet(); 519 } 520 }; 521 long seqid3 = region3.initialize(); 522 Result result3 = region3.get(g); 523 // Assert that count of cells is same as before crash. 524 assertEquals(result2.size(), result3.size()); 525 assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get()); 526 527 // I can't close wal1. Its been appropriated when we split. 528 region3.close(); 529 wal3.close(); 530 return null; 531 } 532 }); 533 } 534 535 /** 536 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 537 * got flushed but others did not. 538 * <p/> 539 * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is 540 * by flushing at the region level, and then deleting the recently flushed store file for one of 541 * the Stores. This would put us back in the situation where all but that store got flushed and 542 * the region died. 543 * <p/> 544 * We restart Region again, and verify that the edits were replayed. 545 */ 546 @Test 547 public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException, 548 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 549 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 550 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 551 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 552 deleteDir(basedir); 553 final byte[] rowName = tableName.getName(); 554 final int countPerFamily = 10; 555 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 556 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 557 HBaseTestingUtil.closeRegionAndWAL(region3); 558 // Write countPerFamily edits into the three families. Do a flush on one 559 // of the families during the load of edits so its seqid is not same as 560 // others to test we do right thing when different seqids. 561 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 562 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 563 long seqid = region.getOpenSeqNum(); 564 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 565 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 566 } 567 568 // Now assert edits made it in. 569 final Get g = new Get(rowName); 570 Result result = region.get(g); 571 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 572 573 // Let us flush the region 574 region.flush(true); 575 region.close(true); 576 wal.shutdown(); 577 578 // delete the store files in the second column family to simulate a failure 579 // in between the flushcache(); 580 // we have 3 families. killing the middle one ensures that taking the maximum 581 // will make us fail. 582 int cf_count = 0; 583 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 584 cf_count++; 585 if (cf_count == 2) { 586 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); 587 } 588 } 589 590 // Let us try to split and recover 591 runWALSplit(this.conf); 592 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 593 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); 594 long seqid2 = region2.getOpenSeqNum(); 595 assertTrue(seqid + result.size() < seqid2); 596 597 final Result result1b = region2.get(g); 598 assertEquals(result.size(), result1b.size()); 599 } 600 601 // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. 602 // Only throws exception if throwExceptionWhenFlushing is set true. 603 public static class CustomStoreFlusher extends DefaultStoreFlusher { 604 // Switch between throw and not throw exception in flush 605 public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); 606 607 public CustomStoreFlusher(Configuration conf, HStore store) { 608 super(conf, store); 609 } 610 611 @Override 612 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 613 MonitoredTask status, ThroughputController throughputController, 614 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 615 if (throwExceptionWhenFlushing.get()) { 616 throw new IOException("Simulated exception by tests"); 617 } 618 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 619 writerCreationTracker); 620 } 621 } 622 623 /** 624 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 625 * flush after writing some data, then writing more data and flush again, at last verify the data. 626 */ 627 @Test 628 public void testReplayEditsAfterAbortingFlush() throws IOException { 629 final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush"); 630 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 631 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 632 deleteDir(basedir); 633 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 634 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 635 HBaseTestingUtil.closeRegionAndWAL(region3); 636 // Write countPerFamily edits into the three families. Do a flush on one 637 // of the families during the load of edits so its seqid is not same as 638 // others to test we do right thing when different seqids. 639 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 640 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 641 Mockito.doReturn(false).when(rsServices).isAborted(); 642 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 643 when(rsServices.getConfiguration()).thenReturn(conf); 644 Configuration customConf = new Configuration(this.conf); 645 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 646 CustomStoreFlusher.class.getName()); 647 HRegion region = 648 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); 649 int writtenRowCount = 10; 650 List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies())); 651 for (int i = 0; i < writtenRowCount; i++) { 652 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 653 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 654 Bytes.toBytes("val")); 655 region.put(put); 656 } 657 658 // Now assert edits made it in. 659 RegionScanner scanner = region.getScanner(new Scan()); 660 assertEquals(writtenRowCount, getScannedCount(scanner)); 661 662 // Let us flush the region 663 CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 664 try { 665 region.flush(true); 666 fail("Injected exception hasn't been thrown"); 667 } catch (IOException e) { 668 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 669 // simulated to abort server 670 Mockito.doReturn(true).when(rsServices).isAborted(); 671 region.setClosing(false); // region normally does not accept writes after 672 // DroppedSnapshotException. We mock around it for this test. 673 } 674 // writing more data 675 int moreRow = 10; 676 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 677 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 678 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 679 Bytes.toBytes("val")); 680 region.put(put); 681 } 682 writtenRowCount += moreRow; 683 // call flush again 684 CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 685 try { 686 region.flush(true); 687 } catch (IOException t) { 688 LOG.info( 689 "Expected exception when flushing region because server is stopped," + t.getMessage()); 690 } 691 692 region.close(true); 693 wal.shutdown(); 694 695 // Let us try to split and recover 696 runWALSplit(this.conf); 697 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 698 Mockito.doReturn(false).when(rsServices).isAborted(); 699 HRegion region2 = 700 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); 701 scanner = region2.getScanner(new Scan()); 702 assertEquals(writtenRowCount, getScannedCount(scanner)); 703 } 704 705 private int getScannedCount(RegionScanner scanner) throws IOException { 706 int scannedCount = 0; 707 List<Cell> results = new ArrayList<>(); 708 while (true) { 709 boolean existMore = scanner.next(results); 710 if (!results.isEmpty()) { 711 scannedCount++; 712 } 713 if (!existMore) { 714 break; 715 } 716 results.clear(); 717 } 718 return scannedCount; 719 } 720 721 /** 722 * Create an HRegion with the result of a WAL split and test we only see the good edits= 723 */ 724 @Test 725 public void testReplayEditsWrittenIntoWAL() throws Exception { 726 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 727 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 728 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 729 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 730 deleteDir(basedir); 731 732 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 733 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 734 HBaseTestingUtil.closeRegionAndWAL(region2); 735 final WAL wal = createWAL(this.conf, hbaseRootDir, logName); 736 final byte[] rowName = tableName.getName(); 737 final byte[] regionName = hri.getEncodedNameAsBytes(); 738 739 // Add 1k to each family. 740 final int countPerFamily = 1000; 741 Set<byte[]> familyNames = new HashSet<>(); 742 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 743 for (byte[] fam : htd.getColumnFamilyNames()) { 744 scopes.put(fam, 0); 745 } 746 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 747 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes); 748 familyNames.add(hcd.getName()); 749 } 750 751 // Add a cache flush, shouldn't have any effect 752 wal.startCacheFlush(regionName, familyNames); 753 wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM); 754 755 // Add an edit to another family, should be skipped. 756 WALEdit edit = new WALEdit(); 757 long now = ee.currentTime(); 758 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); 759 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 760 edit); 761 762 // Delete the c family to verify deletes make it over. 763 edit = new WALEdit(); 764 now = ee.currentTime(); 765 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); 766 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 767 edit); 768 769 // Sync. 770 wal.sync(); 771 // Make a new conf and a new fs for the splitter to run on so we can take 772 // over old wal. 773 final Configuration newConf = HBaseConfiguration.create(this.conf); 774 User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime"); 775 user.runAs(new PrivilegedExceptionAction<Void>() { 776 @Override 777 public Void run() throws Exception { 778 runWALSplit(newConf); 779 FileSystem newFS = FileSystem.get(newConf); 780 // 100k seems to make for about 4 flushes during HRegion#initialize. 781 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); 782 // Make a new wal for new region. 783 WAL newWal = createWAL(newConf, hbaseRootDir, logName); 784 final AtomicInteger flushcount = new AtomicInteger(0); 785 try { 786 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { 787 @Override 788 protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, 789 final Collection<HStore> storesToFlush, MonitoredTask status, 790 boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { 791 LOG.info("InternalFlushCache Invoked"); 792 FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, 793 Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); 794 flushcount.incrementAndGet(); 795 return fs; 796 } 797 }; 798 // The seq id this region has opened up with 799 long seqid = region.initialize(); 800 801 // The mvcc readpoint of from inserting data. 802 long writePoint = mvcc.getWritePoint(); 803 804 // We flushed during init. 805 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); 806 assertTrue((seqid - 1) == writePoint); 807 808 Get get = new Get(rowName); 809 Result result = region.get(get); 810 // Make sure we only see the good edits 811 assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size()); 812 region.close(); 813 } finally { 814 newWal.close(); 815 } 816 return null; 817 } 818 }); 819 } 820 821 @Test 822 // the following test is for HBASE-6065 823 public void testSequentialEditLogSeqNum() throws IOException { 824 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 825 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 826 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 827 deleteDir(basedir); 828 final byte[] rowName = tableName.getName(); 829 final int countPerFamily = 10; 830 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 831 832 // Mock the WAL 833 MockWAL wal = createMockWAL(); 834 835 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 836 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 837 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 838 } 839 840 // Let us flush the region 841 // But this time completeflushcache is not yet done 842 region.flush(true); 843 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 844 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); 845 } 846 long lastestSeqNumber = region.getReadPoint(null); 847 // get the current seq no 848 wal.doCompleteCacheFlush = true; 849 // allow complete cache flush with the previous seq number got after first 850 // set of edits. 851 wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 852 wal.shutdown(); 853 FileStatus[] listStatus = wal.getFiles(); 854 assertNotNull(listStatus); 855 assertTrue(listStatus.length > 0); 856 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, 857 wals, null); 858 FileStatus[] listStatus1 = 859 this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName), 860 new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { 861 @Override 862 public boolean accept(Path p) { 863 return !WALSplitUtil.isSequenceIdFile(p); 864 } 865 }); 866 int editCount = 0; 867 for (FileStatus fileStatus : listStatus1) { 868 editCount = Integer.parseInt(fileStatus.getPath().getName()); 869 } 870 // The sequence number should be same 871 assertEquals( 872 "The sequence number of the recoverd.edits and the current edit seq should be same", 873 lastestSeqNumber, editCount); 874 } 875 876 /** 877 * testcase for https://issues.apache.org/jira/browse/HBASE-15252 878 */ 879 @Test 880 public void testDatalossWhenInputError() throws Exception { 881 final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); 882 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 883 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 884 deleteDir(basedir); 885 final byte[] rowName = tableName.getName(); 886 final int countPerFamily = 10; 887 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 888 HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 889 Path regionDir = region1.getWALRegionDir(); 890 HBaseTestingUtil.closeRegionAndWAL(region1); 891 892 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 893 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 894 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 895 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 896 } 897 // Now assert edits made it in. 898 final Get g = new Get(rowName); 899 Result result = region.get(g); 900 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 901 // Now close the region (without flush), split the log, reopen the region and assert that 902 // replay of log has the correct effect. 903 region.close(true); 904 wal.shutdown(); 905 906 runWALSplit(this.conf); 907 908 // here we let the DFSInputStream throw an IOException just after the WALHeader. 909 Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first(); 910 FSDataInputStream stream = fs.open(editFile); 911 stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); 912 Class<? extends AbstractFSWALProvider.Reader> logReaderClass = 913 conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 914 AbstractFSWALProvider.Reader.class); 915 AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance(); 916 reader.init(this.fs, editFile, conf, stream); 917 final long headerLength = stream.getPos(); 918 reader.close(); 919 FileSystem spyFs = spy(this.fs); 920 doAnswer(new Answer<FSDataInputStream>() { 921 922 @Override 923 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 924 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); 925 Field field = FilterInputStream.class.getDeclaredField("in"); 926 field.setAccessible(true); 927 final DFSInputStream in = (DFSInputStream) field.get(stream); 928 DFSInputStream spyIn = spy(in); 929 doAnswer(new Answer<Integer>() { 930 931 private long pos; 932 933 @Override 934 public Integer answer(InvocationOnMock invocation) throws Throwable { 935 if (pos >= headerLength) { 936 throw new IOException("read over limit"); 937 } 938 int b = (Integer) invocation.callRealMethod(); 939 if (b > 0) { 940 pos += b; 941 } 942 return b; 943 } 944 }).when(spyIn).read(any(byte[].class), anyInt(), anyInt()); 945 doAnswer(new Answer<Void>() { 946 947 @Override 948 public Void answer(InvocationOnMock invocation) throws Throwable { 949 invocation.callRealMethod(); 950 in.close(); 951 return null; 952 } 953 }).when(spyIn).close(); 954 field.set(stream, spyIn); 955 return stream; 956 } 957 }).when(spyFs).open(eq(editFile)); 958 959 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 960 HRegion region2; 961 try { 962 // log replay should fail due to the IOException, otherwise we may lose data. 963 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); 964 assertEquals(result.size(), region2.get(g).size()); 965 } catch (IOException e) { 966 assertEquals("read over limit", e.getMessage()); 967 } 968 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); 969 assertEquals(result.size(), region2.get(g).size()); 970 } 971 972 /** 973 * testcase for https://issues.apache.org/jira/browse/HBASE-14949. 974 */ 975 private void testNameConflictWhenSplit(boolean largeFirst) 976 throws IOException, StreamLacksCapabilityException { 977 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 978 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 979 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 980 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 981 deleteDir(basedir); 982 983 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 984 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 985 for (byte[] fam : htd.getColumnFamilyNames()) { 986 scopes.put(fam, 0); 987 } 988 HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 989 HBaseTestingUtil.closeRegionAndWAL(region); 990 final byte[] family = htd.getColumnFamilies()[0].getName(); 991 final byte[] rowName = tableName.getName(); 992 FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); 993 FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); 994 995 Path largeFile = new Path(logDir, "wal-1"); 996 Path smallFile = new Path(logDir, "wal-2"); 997 writerWALFile(largeFile, Arrays.asList(entry1, entry2)); 998 writerWALFile(smallFile, Arrays.asList(entry2)); 999 FileStatus first, second; 1000 if (largeFirst) { 1001 first = fs.getFileStatus(largeFile); 1002 second = fs.getFileStatus(smallFile); 1003 } else { 1004 first = fs.getFileStatus(smallFile); 1005 second = fs.getFileStatus(largeFile); 1006 } 1007 WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null); 1008 WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null); 1009 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 1010 region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); 1011 assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); 1012 assertEquals(2, region.get(new Get(rowName)).size()); 1013 } 1014 1015 @Test 1016 public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { 1017 testNameConflictWhenSplit(true); 1018 } 1019 1020 @Test 1021 public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { 1022 testNameConflictWhenSplit(false); 1023 } 1024 1025 static class MockWAL extends FSHLog { 1026 boolean doCompleteCacheFlush = false; 1027 1028 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) 1029 throws IOException { 1030 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 1031 } 1032 1033 @Override 1034 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 1035 if (!doCompleteCacheFlush) { 1036 return; 1037 } 1038 super.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 1039 } 1040 } 1041 1042 private TableDescriptor createBasic1FamilyHTD(final TableName tableName) { 1043 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1044 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1045 return builder.build(); 1046 } 1047 1048 private MockWAL createMockWAL() throws IOException { 1049 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); 1050 wal.init(); 1051 // Set down maximum recovery so we dfsclient doesn't linger retrying something 1052 // long gone. 1053 HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); 1054 return wal; 1055 } 1056 1057 // Flusher used in this test. Keep count of how often we are called and 1058 // actually run the flush inside here. 1059 static class TestFlusher implements FlushRequester { 1060 private HRegion r; 1061 1062 @Override 1063 public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) { 1064 try { 1065 r.flush(false); 1066 return true; 1067 } catch (IOException e) { 1068 throw new RuntimeException("Exception flushing", e); 1069 } 1070 } 1071 1072 @Override 1073 public boolean requestFlush(HRegion region, List<byte[]> families, 1074 FlushLifeCycleTracker tracker) { 1075 return true; 1076 } 1077 1078 @Override 1079 public boolean requestDelayedFlush(HRegion region, long when) { 1080 return true; 1081 } 1082 1083 @Override 1084 public void registerFlushRequestListener(FlushRequestListener listener) { 1085 1086 } 1087 1088 @Override 1089 public boolean unregisterFlushRequestListener(FlushRequestListener listener) { 1090 return false; 1091 } 1092 1093 @Override 1094 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 1095 1096 } 1097 } 1098 1099 private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri, 1100 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { 1101 return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); 1102 } 1103 1104 private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, 1105 int index) { 1106 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); 1107 byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); 1108 WALEdit edit = new WALEdit(); 1109 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 1110 return edit; 1111 } 1112 1113 private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence, 1114 byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, 1115 int index, NavigableMap<byte[], Integer> scopes) throws IOException { 1116 FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), 1117 createWALEdit(rowName, family, ee, index), hri, true, null); 1118 entry.stampRegionSequenceId(mvcc.begin()); 1119 return entry; 1120 } 1121 1122 private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName, 1123 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 1124 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) 1125 throws IOException { 1126 for (int j = 0; j < count; j++) { 1127 wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), 1128 createWALEdit(rowName, family, ee, j)); 1129 } 1130 wal.sync(); 1131 } 1132 1133 public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, 1134 EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { 1135 List<Put> puts = new ArrayList<>(); 1136 for (int j = 0; j < count; j++) { 1137 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); 1138 Put p = new Put(rowName); 1139 p.addColumn(family, qualifier, ee.currentTime(), rowName); 1140 r.put(p); 1141 puts.add(p); 1142 } 1143 return puts; 1144 } 1145 1146 /** 1147 * Creates an HRI around an HTD that has <code>tableName</code> and three column families named 1148 * 'a','b', and 'c'. 1149 * @param tableName Name of table to use when we create HTableDescriptor. 1150 */ 1151 private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { 1152 return RegionInfoBuilder.newBuilder(tableName).build(); 1153 } 1154 1155 /** 1156 * Run the split. Verify only single split file made. 1157 * @return The single split file made 1158 */ 1159 private Path runWALSplit(final Configuration c) throws IOException { 1160 List<Path> splits = 1161 WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 1162 // Split should generate only 1 file since there's only 1 region 1163 assertEquals("splits=" + splits, 1, splits.size()); 1164 // Make sure the file exists 1165 assertTrue(fs.exists(splits.get(0))); 1166 LOG.info("Split file=" + splits.get(0)); 1167 return splits.get(0); 1168 } 1169 1170 private TableDescriptor createBasic3FamilyHTD(final TableName tableName) { 1171 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1172 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1173 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b"))); 1174 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c"))); 1175 return builder.build(); 1176 } 1177 1178 private void writerWALFile(Path file, List<FSWALEntry> entries) 1179 throws IOException, StreamLacksCapabilityException { 1180 fs.mkdirs(file.getParent()); 1181 ProtobufLogWriter writer = new ProtobufLogWriter(); 1182 writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file), 1183 StreamSlowMonitor.create(conf, "testMonitor")); 1184 for (FSWALEntry entry : entries) { 1185 writer.append(entry); 1186 } 1187 writer.sync(false); 1188 writer.close(); 1189 } 1190 1191 protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 1192 throws IOException; 1193}