001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyInt; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.spy; 029import static org.mockito.Mockito.when; 030 031import java.io.FilterInputStream; 032import java.io.IOException; 033import java.lang.reflect.Field; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collection; 038import java.util.HashSet; 039import java.util.List; 040import java.util.NavigableMap; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicInteger; 045import java.util.function.Consumer; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HBaseTestingUtil; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.KeyValue; 057import org.apache.hadoop.hbase.ServerName; 058import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 062import org.apache.hadoop.hbase.client.Delete; 063import org.apache.hadoop.hbase.client.Get; 064import org.apache.hadoop.hbase.client.Put; 065import org.apache.hadoop.hbase.client.RegionInfo; 066import org.apache.hadoop.hbase.client.RegionInfoBuilder; 067import org.apache.hadoop.hbase.client.Result; 068import org.apache.hadoop.hbase.client.ResultScanner; 069import org.apache.hadoop.hbase.client.Scan; 070import org.apache.hadoop.hbase.client.Table; 071import org.apache.hadoop.hbase.client.TableDescriptor; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 074import org.apache.hadoop.hbase.monitoring.MonitoredTask; 075import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 076import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 077import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 078import org.apache.hadoop.hbase.regionserver.FlushRequestListener; 079import org.apache.hadoop.hbase.regionserver.FlushRequester; 080import org.apache.hadoop.hbase.regionserver.HRegion; 081import org.apache.hadoop.hbase.regionserver.HRegionServer; 082import org.apache.hadoop.hbase.regionserver.HStore; 083import org.apache.hadoop.hbase.regionserver.MemStoreSizing; 084import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 085import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 086import org.apache.hadoop.hbase.regionserver.Region; 087import org.apache.hadoop.hbase.regionserver.RegionScanner; 088import org.apache.hadoop.hbase.regionserver.RegionServerServices; 089import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 090import org.apache.hadoop.hbase.security.User; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.CommonFSUtils; 093import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 094import org.apache.hadoop.hbase.util.EnvironmentEdge; 095import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 096import org.apache.hadoop.hbase.util.HFileTestUtil; 097import org.apache.hadoop.hbase.util.Pair; 098import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 099import org.apache.hadoop.hbase.wal.WAL; 100import org.apache.hadoop.hbase.wal.WALEdit; 101import org.apache.hadoop.hbase.wal.WALFactory; 102import org.apache.hadoop.hbase.wal.WALKeyImpl; 103import org.apache.hadoop.hbase.wal.WALSplitUtil; 104import org.apache.hadoop.hbase.wal.WALSplitter; 105import org.apache.hadoop.hdfs.DFSInputStream; 106import org.junit.After; 107import org.junit.AfterClass; 108import org.junit.Before; 109import org.junit.BeforeClass; 110import org.junit.Rule; 111import org.junit.Test; 112import org.junit.rules.TestName; 113import org.mockito.Mockito; 114import org.mockito.invocation.InvocationOnMock; 115import org.mockito.stubbing.Answer; 116import org.slf4j.Logger; 117import org.slf4j.LoggerFactory; 118 119/** 120 * Test replay of edits out of a WAL split. 121 */ 122public abstract class AbstractTestWALReplay { 123 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 124 static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 125 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 126 private Path hbaseRootDir = null; 127 private String logName; 128 private Path oldLogDir; 129 private Path logDir; 130 private FileSystem fs; 131 private Configuration conf; 132 private WALFactory wals; 133 134 @Rule 135 public final TestName currentTest = new TestName(); 136 137 @BeforeClass 138 public static void setUpBeforeClass() throws Exception { 139 Configuration conf = TEST_UTIL.getConfiguration(); 140 // The below config supported by 0.20-append and CDH3b2 141 conf.setInt("dfs.client.block.recovery.retries", 2); 142 TEST_UTIL.startMiniCluster(3); 143 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 144 LOG.info("hbase.rootdir=" + hbaseRootDir); 145 CommonFSUtils.setRootDir(conf, hbaseRootDir); 146 } 147 148 @AfterClass 149 public static void tearDownAfterClass() throws Exception { 150 TEST_UTIL.shutdownMiniCluster(); 151 } 152 153 @Before 154 public void setUp() throws Exception { 155 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 156 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 157 this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf); 158 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 159 String serverName = ServerName 160 .valueOf(currentTest.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 161 .toString(); 162 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 163 this.logDir = new Path(this.hbaseRootDir, logName); 164 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 165 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 166 } 167 this.wals = new WALFactory(conf, currentTest.getMethodName()); 168 } 169 170 @After 171 public void tearDown() throws Exception { 172 this.wals.close(); 173 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 174 } 175 176 /* 177 * @param p Directory to cleanup 178 */ 179 private void deleteDir(final Path p) throws IOException { 180 if (this.fs.exists(p)) { 181 if (!this.fs.delete(p, true)) { 182 throw new IOException("Failed remove of " + p); 183 } 184 } 185 } 186 187 /** 188 * n 189 */ 190 @Test 191 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { 192 final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); 193 byte[] family1 = Bytes.toBytes("cf1"); 194 byte[] family2 = Bytes.toBytes("cf2"); 195 byte[] qualifier = Bytes.toBytes("q"); 196 byte[] value = Bytes.toBytes("testV"); 197 byte[][] familys = { family1, family2 }; 198 TEST_UTIL.createTable(tableName, familys); 199 Table htable = TEST_UTIL.getConnection().getTable(tableName); 200 Put put = new Put(Bytes.toBytes("r1")); 201 put.addColumn(family1, qualifier, value); 202 htable.put(put); 203 ResultScanner resultScanner = htable.getScanner(new Scan()); 204 int count = 0; 205 while (resultScanner.next() != null) { 206 count++; 207 } 208 resultScanner.close(); 209 assertEquals(1, count); 210 211 SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); 212 List<HRegion> regions = hbaseCluster.getRegions(tableName); 213 assertEquals(1, regions.size()); 214 215 // move region to another regionserver 216 Region destRegion = regions.get(0); 217 int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName()); 218 assertTrue("Please start more than 1 regionserver", 219 hbaseCluster.getRegionServerThreads().size() > 1); 220 int destServerNum = 0; 221 while (destServerNum == originServerNum) { 222 destServerNum++; 223 } 224 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); 225 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); 226 // move region to destination regionserver 227 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName()); 228 229 // delete the row 230 Delete del = new Delete(Bytes.toBytes("r1")); 231 htable.delete(del); 232 resultScanner = htable.getScanner(new Scan()); 233 count = 0; 234 while (resultScanner.next() != null) { 235 count++; 236 } 237 resultScanner.close(); 238 assertEquals(0, count); 239 240 // flush region and make major compaction 241 HRegion region = 242 (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); 243 region.flush(true); 244 // wait to complete major compaction 245 for (HStore store : region.getStores()) { 246 store.triggerMajorCompaction(); 247 } 248 region.compact(true); 249 250 // move region to origin regionserver 251 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName()); 252 // abort the origin regionserver 253 originServer.abort("testing"); 254 255 // see what we get 256 Result result = htable.get(new Get(Bytes.toBytes("r1"))); 257 if (result != null) { 258 assertTrue("Row is deleted, but we get" + result.toString(), 259 (result == null) || result.isEmpty()); 260 } 261 resultScanner.close(); 262 } 263 264 /** 265 * Tests for hbase-2727. 266 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> 267 */ 268 @Test 269 public void test2727() throws Exception { 270 // Test being able to have > 1 set of edits in the recovered.edits directory. 271 // Ensure edits are replayed properly. 272 final TableName tableName = TableName.valueOf("test2727"); 273 274 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 275 RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 276 Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 277 deleteDir(basedir); 278 279 TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName); 280 Region region2 = 281 HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor); 282 HBaseTestingUtil.closeRegionAndWAL(region2); 283 final byte[] rowName = tableName.getName(); 284 285 WAL wal1 = createWAL(this.conf, hbaseRootDir, logName); 286 // Add 1k to each family. 287 final int countPerFamily = 1000; 288 289 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 290 for (byte[] fam : tableDescriptor.getColumnFamilyNames()) { 291 scopes.put(fam, 0); 292 } 293 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 294 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1, 295 mvcc, scopes); 296 } 297 wal1.shutdown(); 298 runWALSplit(this.conf); 299 300 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 301 // Add 1k to each family. 302 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 303 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2, 304 mvcc, scopes); 305 } 306 wal2.shutdown(); 307 runWALSplit(this.conf); 308 309 WAL wal3 = createWAL(this.conf, hbaseRootDir, logName); 310 try { 311 HRegion region = 312 HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3); 313 long seqid = region.getOpenSeqNum(); 314 // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. 315 // When opened, this region would apply 6k edits, and increment the sequenceId by 1 316 assertTrue(seqid > mvcc.getWritePoint()); 317 assertEquals(seqid - 1, mvcc.getWritePoint()); 318 LOG.debug( 319 "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint()); 320 321 // TODO: Scan all. 322 region.close(); 323 } finally { 324 wal3.close(); 325 } 326 } 327 328 /** 329 * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'. 330 */ 331 @Test 332 public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException, 333 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 334 final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); 335 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 336 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 337 deleteDir(basedir); 338 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 339 Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 340 HBaseTestingUtil.closeRegionAndWAL(region2); 341 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 342 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 343 344 byte[] family = htd.getColumnFamilies()[0].getName(); 345 Path f = new Path(basedir, "hfile"); 346 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), 347 Bytes.toBytes("z"), 10); 348 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 349 hfs.add(Pair.newPair(family, f.toString())); 350 region.bulkLoadHFiles(hfs, true, null); 351 352 // Add an edit so something in the WAL 353 byte[] row = tableName.getName(); 354 region.put((new Put(row)).addColumn(family, family, family)); 355 wal.sync(); 356 final int rowsInsertedCount = 11; 357 358 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 359 360 // Now 'crash' the region by stealing its wal 361 final Configuration newConf = HBaseConfiguration.create(this.conf); 362 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 363 user.runAs(new PrivilegedExceptionAction() { 364 @Override 365 public Object run() throws Exception { 366 runWALSplit(newConf); 367 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 368 369 HRegion region2 = 370 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 371 long seqid2 = region2.getOpenSeqNum(); 372 assertTrue(seqid2 > -1); 373 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 374 375 // I can't close wal1. Its been appropriated when we split. 376 region2.close(); 377 wal2.close(); 378 return null; 379 } 380 }); 381 } 382 383 /** 384 * HRegion test case that is made of a major compacted HFile (created with three bulk loaded 385 * files) and an edit in the memstore. 386 * <p/> 387 * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from 388 * being replayed" 389 */ 390 @Test 391 public void testCompactedBulkLoadedFiles() throws IOException, SecurityException, 392 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 393 final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles"); 394 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 395 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 396 deleteDir(basedir); 397 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 398 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 399 HBaseTestingUtil.closeRegionAndWAL(region2); 400 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 401 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 402 403 // Add an edit so something in the WAL 404 byte[] row = tableName.getName(); 405 byte[] family = htd.getColumnFamilies()[0].getName(); 406 region.put((new Put(row)).addColumn(family, family, family)); 407 wal.sync(); 408 409 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 410 for (int i = 0; i < 3; i++) { 411 Path f = new Path(basedir, "hfile" + i); 412 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), 413 Bytes.toBytes(i + "50"), 10); 414 hfs.add(Pair.newPair(family, f.toString())); 415 } 416 region.bulkLoadHFiles(hfs, true, null); 417 final int rowsInsertedCount = 31; 418 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 419 420 // major compact to turn all the bulk loaded files into one normal file 421 region.compact(true); 422 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 423 424 // Now 'crash' the region by stealing its wal 425 final Configuration newConf = HBaseConfiguration.create(this.conf); 426 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 427 user.runAs(new PrivilegedExceptionAction() { 428 @Override 429 public Object run() throws Exception { 430 runWALSplit(newConf); 431 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 432 433 HRegion region2 = 434 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 435 long seqid2 = region2.getOpenSeqNum(); 436 assertTrue(seqid2 > -1); 437 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 438 439 // I can't close wal1. Its been appropriated when we split. 440 region2.close(); 441 wal2.close(); 442 return null; 443 } 444 }); 445 } 446 447 /** 448 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 449 * seqids. 450 */ 451 @Test 452 public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException, 453 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 454 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 455 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 456 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 457 deleteDir(basedir); 458 final byte[] rowName = tableName.getName(); 459 final int countPerFamily = 10; 460 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 461 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 462 HBaseTestingUtil.closeRegionAndWAL(region3); 463 // Write countPerFamily edits into the three families. Do a flush on one 464 // of the families during the load of edits so its seqid is not same as 465 // others to test we do right thing when different seqids. 466 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 467 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 468 long seqid = region.getOpenSeqNum(); 469 boolean first = true; 470 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 471 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 472 if (first) { 473 // If first, so we have at least one family w/ different seqid to rest. 474 region.flush(true); 475 first = false; 476 } 477 } 478 // Now assert edits made it in. 479 final Get g = new Get(rowName); 480 Result result = region.get(g); 481 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 482 // Now close the region (without flush), split the log, reopen the region and assert that 483 // replay of log has the correct effect, that our seqids are calculated correctly so 484 // all edits in logs are seen as 'stale'/old. 485 region.close(true); 486 wal.shutdown(); 487 runWALSplit(this.conf); 488 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 489 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); 490 long seqid2 = region2.getOpenSeqNum(); 491 assertTrue(seqid + result.size() < seqid2); 492 final Result result1b = region2.get(g); 493 assertEquals(result.size(), result1b.size()); 494 495 // Next test. Add more edits, then 'crash' this region by stealing its wal 496 // out from under it and assert that replay of the log adds the edits back 497 // correctly when region is opened again. 498 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 499 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); 500 } 501 // Get count of edits. 502 final Result result2 = region2.get(g); 503 assertEquals(2 * result.size(), result2.size()); 504 wal2.sync(); 505 final Configuration newConf = HBaseConfiguration.create(this.conf); 506 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 507 user.runAs(new PrivilegedExceptionAction<Object>() { 508 @Override 509 public Object run() throws Exception { 510 runWALSplit(newConf); 511 FileSystem newFS = FileSystem.get(newConf); 512 // Make a new wal for new region open. 513 WAL wal3 = createWAL(newConf, hbaseRootDir, logName); 514 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); 515 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { 516 @Override 517 protected void restoreEdit(HStore s, Cell cell, MemStoreSizing memstoreSizing) { 518 super.restoreEdit(s, cell, memstoreSizing); 519 countOfRestoredEdits.incrementAndGet(); 520 } 521 }; 522 long seqid3 = region3.initialize(); 523 Result result3 = region3.get(g); 524 // Assert that count of cells is same as before crash. 525 assertEquals(result2.size(), result3.size()); 526 assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get()); 527 528 // I can't close wal1. Its been appropriated when we split. 529 region3.close(); 530 wal3.close(); 531 return null; 532 } 533 }); 534 } 535 536 /** 537 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 538 * got flushed but others did not. 539 * <p/> 540 * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is 541 * by flushing at the region level, and then deleting the recently flushed store file for one of 542 * the Stores. This would put us back in the situation where all but that store got flushed and 543 * the region died. 544 * <p/> 545 * We restart Region again, and verify that the edits were replayed. 546 */ 547 @Test 548 public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException, 549 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 550 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 551 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 552 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 553 deleteDir(basedir); 554 final byte[] rowName = tableName.getName(); 555 final int countPerFamily = 10; 556 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 557 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 558 HBaseTestingUtil.closeRegionAndWAL(region3); 559 // Write countPerFamily edits into the three families. Do a flush on one 560 // of the families during the load of edits so its seqid is not same as 561 // others to test we do right thing when different seqids. 562 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 563 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 564 long seqid = region.getOpenSeqNum(); 565 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 566 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 567 } 568 569 // Now assert edits made it in. 570 final Get g = new Get(rowName); 571 Result result = region.get(g); 572 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 573 574 // Let us flush the region 575 region.flush(true); 576 region.close(true); 577 wal.shutdown(); 578 579 // delete the store files in the second column family to simulate a failure 580 // in between the flushcache(); 581 // we have 3 families. killing the middle one ensures that taking the maximum 582 // will make us fail. 583 int cf_count = 0; 584 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 585 cf_count++; 586 if (cf_count == 2) { 587 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); 588 } 589 } 590 591 // Let us try to split and recover 592 runWALSplit(this.conf); 593 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 594 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); 595 long seqid2 = region2.getOpenSeqNum(); 596 assertTrue(seqid + result.size() < seqid2); 597 598 final Result result1b = region2.get(g); 599 assertEquals(result.size(), result1b.size()); 600 } 601 602 // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. 603 // Only throws exception if throwExceptionWhenFlushing is set true. 604 public static class CustomStoreFlusher extends DefaultStoreFlusher { 605 // Switch between throw and not throw exception in flush 606 public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); 607 608 public CustomStoreFlusher(Configuration conf, HStore store) { 609 super(conf, store); 610 } 611 612 @Override 613 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 614 MonitoredTask status, ThroughputController throughputController, 615 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 616 if (throwExceptionWhenFlushing.get()) { 617 throw new IOException("Simulated exception by tests"); 618 } 619 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 620 writerCreationTracker); 621 } 622 } 623 624 /** 625 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 626 * flush after writing some data, then writing more data and flush again, at last verify the data. 627 */ 628 @Test 629 public void testReplayEditsAfterAbortingFlush() throws IOException { 630 final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush"); 631 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 632 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 633 deleteDir(basedir); 634 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 635 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 636 HBaseTestingUtil.closeRegionAndWAL(region3); 637 // Write countPerFamily edits into the three families. Do a flush on one 638 // of the families during the load of edits so its seqid is not same as 639 // others to test we do right thing when different seqids. 640 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 641 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 642 Mockito.doReturn(false).when(rsServices).isAborted(); 643 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 644 when(rsServices.getConfiguration()).thenReturn(conf); 645 Configuration customConf = new Configuration(this.conf); 646 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 647 CustomStoreFlusher.class.getName()); 648 HRegion region = 649 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); 650 int writtenRowCount = 10; 651 List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies())); 652 for (int i = 0; i < writtenRowCount; i++) { 653 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 654 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 655 Bytes.toBytes("val")); 656 region.put(put); 657 } 658 659 // Now assert edits made it in. 660 RegionScanner scanner = region.getScanner(new Scan()); 661 assertEquals(writtenRowCount, getScannedCount(scanner)); 662 663 // Let us flush the region 664 CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 665 try { 666 region.flush(true); 667 fail("Injected exception hasn't been thrown"); 668 } catch (IOException e) { 669 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 670 // simulated to abort server 671 Mockito.doReturn(true).when(rsServices).isAborted(); 672 region.setClosing(false); // region normally does not accept writes after 673 // DroppedSnapshotException. We mock around it for this test. 674 } 675 // writing more data 676 int moreRow = 10; 677 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 678 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 679 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 680 Bytes.toBytes("val")); 681 region.put(put); 682 } 683 writtenRowCount += moreRow; 684 // call flush again 685 CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 686 try { 687 region.flush(true); 688 } catch (IOException t) { 689 LOG.info( 690 "Expected exception when flushing region because server is stopped," + t.getMessage()); 691 } 692 693 region.close(true); 694 wal.shutdown(); 695 696 // Let us try to split and recover 697 runWALSplit(this.conf); 698 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 699 Mockito.doReturn(false).when(rsServices).isAborted(); 700 HRegion region2 = 701 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); 702 scanner = region2.getScanner(new Scan()); 703 assertEquals(writtenRowCount, getScannedCount(scanner)); 704 } 705 706 private int getScannedCount(RegionScanner scanner) throws IOException { 707 int scannedCount = 0; 708 List<Cell> results = new ArrayList<>(); 709 while (true) { 710 boolean existMore = scanner.next(results); 711 if (!results.isEmpty()) { 712 scannedCount++; 713 } 714 if (!existMore) { 715 break; 716 } 717 results.clear(); 718 } 719 return scannedCount; 720 } 721 722 /** 723 * Create an HRegion with the result of a WAL split and test we only see the good edits= 724 */ 725 @Test 726 public void testReplayEditsWrittenIntoWAL() throws Exception { 727 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 728 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 729 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 730 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 731 deleteDir(basedir); 732 733 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 734 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 735 HBaseTestingUtil.closeRegionAndWAL(region2); 736 final WAL wal = createWAL(this.conf, hbaseRootDir, logName); 737 final byte[] rowName = tableName.getName(); 738 final byte[] regionName = hri.getEncodedNameAsBytes(); 739 740 // Add 1k to each family. 741 final int countPerFamily = 1000; 742 Set<byte[]> familyNames = new HashSet<>(); 743 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 744 for (byte[] fam : htd.getColumnFamilyNames()) { 745 scopes.put(fam, 0); 746 } 747 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 748 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes); 749 familyNames.add(hcd.getName()); 750 } 751 752 // Add a cache flush, shouldn't have any effect 753 wal.startCacheFlush(regionName, familyNames); 754 wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM); 755 756 // Add an edit to another family, should be skipped. 757 WALEdit edit = new WALEdit(); 758 long now = ee.currentTime(); 759 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); 760 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 761 edit); 762 763 // Delete the c family to verify deletes make it over. 764 edit = new WALEdit(); 765 now = ee.currentTime(); 766 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); 767 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 768 edit); 769 770 // Sync. 771 wal.sync(); 772 // Make a new conf and a new fs for the splitter to run on so we can take 773 // over old wal. 774 final Configuration newConf = HBaseConfiguration.create(this.conf); 775 User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime"); 776 user.runAs(new PrivilegedExceptionAction<Void>() { 777 @Override 778 public Void run() throws Exception { 779 runWALSplit(newConf); 780 FileSystem newFS = FileSystem.get(newConf); 781 // 100k seems to make for about 4 flushes during HRegion#initialize. 782 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); 783 // Make a new wal for new region. 784 WAL newWal = createWAL(newConf, hbaseRootDir, logName); 785 final AtomicInteger flushcount = new AtomicInteger(0); 786 try { 787 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { 788 @Override 789 protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, 790 final Collection<HStore> storesToFlush, MonitoredTask status, 791 boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { 792 LOG.info("InternalFlushCache Invoked"); 793 FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, 794 Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); 795 flushcount.incrementAndGet(); 796 return fs; 797 } 798 }; 799 // The seq id this region has opened up with 800 long seqid = region.initialize(); 801 802 // The mvcc readpoint of from inserting data. 803 long writePoint = mvcc.getWritePoint(); 804 805 // We flushed during init. 806 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); 807 assertTrue((seqid - 1) == writePoint); 808 809 Get get = new Get(rowName); 810 Result result = region.get(get); 811 // Make sure we only see the good edits 812 assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size()); 813 region.close(); 814 } finally { 815 newWal.close(); 816 } 817 return null; 818 } 819 }); 820 } 821 822 @Test 823 // the following test is for HBASE-6065 824 public void testSequentialEditLogSeqNum() throws IOException { 825 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 826 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 827 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 828 deleteDir(basedir); 829 final byte[] rowName = tableName.getName(); 830 final int countPerFamily = 10; 831 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 832 833 // Mock the WAL 834 MockWAL wal = createMockWAL(); 835 836 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 837 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 838 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 839 } 840 841 // Let us flush the region 842 // But this time completeflushcache is not yet done 843 region.flush(true); 844 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 845 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); 846 } 847 long lastestSeqNumber = region.getReadPoint(null); 848 // get the current seq no 849 wal.doCompleteCacheFlush = true; 850 // allow complete cache flush with the previous seq number got after first 851 // set of edits. 852 wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 853 wal.shutdown(); 854 FileStatus[] listStatus = wal.getFiles(); 855 assertNotNull(listStatus); 856 assertTrue(listStatus.length > 0); 857 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, 858 wals, null); 859 FileStatus[] listStatus1 = 860 this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName), 861 new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { 862 @Override 863 public boolean accept(Path p) { 864 return !WALSplitUtil.isSequenceIdFile(p); 865 } 866 }); 867 int editCount = 0; 868 for (FileStatus fileStatus : listStatus1) { 869 editCount = Integer.parseInt(fileStatus.getPath().getName()); 870 } 871 // The sequence number should be same 872 assertEquals( 873 "The sequence number of the recoverd.edits and the current edit seq should be same", 874 lastestSeqNumber, editCount); 875 } 876 877 /** 878 * testcase for https://issues.apache.org/jira/browse/HBASE-15252 879 */ 880 @Test 881 public void testDatalossWhenInputError() throws Exception { 882 final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); 883 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 884 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 885 deleteDir(basedir); 886 final byte[] rowName = tableName.getName(); 887 final int countPerFamily = 10; 888 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 889 HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 890 Path regionDir = region1.getWALRegionDir(); 891 HBaseTestingUtil.closeRegionAndWAL(region1); 892 893 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 894 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 895 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 896 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 897 } 898 // Now assert edits made it in. 899 final Get g = new Get(rowName); 900 Result result = region.get(g); 901 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 902 // Now close the region (without flush), split the log, reopen the region and assert that 903 // replay of log has the correct effect. 904 region.close(true); 905 wal.shutdown(); 906 907 runWALSplit(this.conf); 908 909 // here we let the DFSInputStream throw an IOException just after the WALHeader. 910 Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first(); 911 FSDataInputStream stream = fs.open(editFile); 912 stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); 913 Class<? extends AbstractFSWALProvider.Reader> logReaderClass = 914 conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 915 AbstractFSWALProvider.Reader.class); 916 AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance(); 917 reader.init(this.fs, editFile, conf, stream); 918 final long headerLength = stream.getPos(); 919 reader.close(); 920 FileSystem spyFs = spy(this.fs); 921 doAnswer(new Answer<FSDataInputStream>() { 922 923 @Override 924 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 925 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); 926 Field field = FilterInputStream.class.getDeclaredField("in"); 927 field.setAccessible(true); 928 final DFSInputStream in = (DFSInputStream) field.get(stream); 929 DFSInputStream spyIn = spy(in); 930 doAnswer(new Answer<Integer>() { 931 932 private long pos; 933 934 @Override 935 public Integer answer(InvocationOnMock invocation) throws Throwable { 936 if (pos >= headerLength) { 937 throw new IOException("read over limit"); 938 } 939 int b = (Integer) invocation.callRealMethod(); 940 if (b > 0) { 941 pos += b; 942 } 943 return b; 944 } 945 }).when(spyIn).read(any(byte[].class), anyInt(), anyInt()); 946 doAnswer(new Answer<Void>() { 947 948 @Override 949 public Void answer(InvocationOnMock invocation) throws Throwable { 950 invocation.callRealMethod(); 951 in.close(); 952 return null; 953 } 954 }).when(spyIn).close(); 955 field.set(stream, spyIn); 956 return stream; 957 } 958 }).when(spyFs).open(eq(editFile)); 959 960 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 961 HRegion region2; 962 try { 963 // log replay should fail due to the IOException, otherwise we may lose data. 964 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); 965 assertEquals(result.size(), region2.get(g).size()); 966 } catch (IOException e) { 967 assertEquals("read over limit", e.getMessage()); 968 } 969 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); 970 assertEquals(result.size(), region2.get(g).size()); 971 } 972 973 /** 974 * testcase for https://issues.apache.org/jira/browse/HBASE-14949. 975 */ 976 private void testNameConflictWhenSplit(boolean largeFirst) 977 throws IOException, StreamLacksCapabilityException { 978 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 979 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 980 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 981 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 982 deleteDir(basedir); 983 984 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 985 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 986 for (byte[] fam : htd.getColumnFamilyNames()) { 987 scopes.put(fam, 0); 988 } 989 HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 990 HBaseTestingUtil.closeRegionAndWAL(region); 991 final byte[] family = htd.getColumnFamilies()[0].getName(); 992 final byte[] rowName = tableName.getName(); 993 FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); 994 FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); 995 996 Path largeFile = new Path(logDir, "wal-1"); 997 Path smallFile = new Path(logDir, "wal-2"); 998 writerWALFile(largeFile, Arrays.asList(entry1, entry2)); 999 writerWALFile(smallFile, Arrays.asList(entry2)); 1000 FileStatus first, second; 1001 if (largeFirst) { 1002 first = fs.getFileStatus(largeFile); 1003 second = fs.getFileStatus(smallFile); 1004 } else { 1005 first = fs.getFileStatus(smallFile); 1006 second = fs.getFileStatus(largeFile); 1007 } 1008 WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null); 1009 WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null); 1010 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 1011 region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); 1012 assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); 1013 assertEquals(2, region.get(new Get(rowName)).size()); 1014 } 1015 1016 @Test 1017 public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { 1018 testNameConflictWhenSplit(true); 1019 } 1020 1021 @Test 1022 public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { 1023 testNameConflictWhenSplit(false); 1024 } 1025 1026 static class MockWAL extends FSHLog { 1027 boolean doCompleteCacheFlush = false; 1028 1029 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) 1030 throws IOException { 1031 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 1032 } 1033 1034 @Override 1035 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 1036 if (!doCompleteCacheFlush) { 1037 return; 1038 } 1039 super.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 1040 } 1041 } 1042 1043 private TableDescriptor createBasic1FamilyHTD(final TableName tableName) { 1044 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1045 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1046 return builder.build(); 1047 } 1048 1049 private MockWAL createMockWAL() throws IOException { 1050 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); 1051 wal.init(); 1052 // Set down maximum recovery so we dfsclient doesn't linger retrying something 1053 // long gone. 1054 HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); 1055 return wal; 1056 } 1057 1058 // Flusher used in this test. Keep count of how often we are called and 1059 // actually run the flush inside here. 1060 static class TestFlusher implements FlushRequester { 1061 private HRegion r; 1062 1063 @Override 1064 public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) { 1065 try { 1066 r.flush(false); 1067 return true; 1068 } catch (IOException e) { 1069 throw new RuntimeException("Exception flushing", e); 1070 } 1071 } 1072 1073 @Override 1074 public boolean requestFlush(HRegion region, List<byte[]> families, 1075 FlushLifeCycleTracker tracker) { 1076 return true; 1077 } 1078 1079 @Override 1080 public boolean requestDelayedFlush(HRegion region, long when) { 1081 return true; 1082 } 1083 1084 @Override 1085 public void registerFlushRequestListener(FlushRequestListener listener) { 1086 1087 } 1088 1089 @Override 1090 public boolean unregisterFlushRequestListener(FlushRequestListener listener) { 1091 return false; 1092 } 1093 1094 @Override 1095 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 1096 1097 } 1098 } 1099 1100 private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri, 1101 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { 1102 return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); 1103 } 1104 1105 private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, 1106 int index) { 1107 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); 1108 byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); 1109 WALEdit edit = new WALEdit(); 1110 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 1111 return edit; 1112 } 1113 1114 private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence, 1115 byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, 1116 int index, NavigableMap<byte[], Integer> scopes) throws IOException { 1117 FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), 1118 createWALEdit(rowName, family, ee, index), hri, true, null); 1119 entry.stampRegionSequenceId(mvcc.begin()); 1120 return entry; 1121 } 1122 1123 private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName, 1124 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 1125 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) 1126 throws IOException { 1127 for (int j = 0; j < count; j++) { 1128 wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), 1129 createWALEdit(rowName, family, ee, j)); 1130 } 1131 wal.sync(); 1132 } 1133 1134 public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, 1135 EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { 1136 List<Put> puts = new ArrayList<>(); 1137 for (int j = 0; j < count; j++) { 1138 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); 1139 Put p = new Put(rowName); 1140 p.addColumn(family, qualifier, ee.currentTime(), rowName); 1141 r.put(p); 1142 puts.add(p); 1143 } 1144 return puts; 1145 } 1146 1147 /** 1148 * Creates an HRI around an HTD that has <code>tableName</code> and three column families named 1149 * 'a','b', and 'c'. 1150 * @param tableName Name of table to use when we create HTableDescriptor. 1151 */ 1152 private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { 1153 return RegionInfoBuilder.newBuilder(tableName).build(); 1154 } 1155 1156 /** 1157 * Run the split. Verify only single split file made. 1158 * @return The single split file made 1159 */ 1160 private Path runWALSplit(final Configuration c) throws IOException { 1161 List<Path> splits = 1162 WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 1163 // Split should generate only 1 file since there's only 1 region 1164 assertEquals("splits=" + splits, 1, splits.size()); 1165 // Make sure the file exists 1166 assertTrue(fs.exists(splits.get(0))); 1167 LOG.info("Split file=" + splits.get(0)); 1168 return splits.get(0); 1169 } 1170 1171 private TableDescriptor createBasic3FamilyHTD(final TableName tableName) { 1172 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1173 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1174 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b"))); 1175 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c"))); 1176 return builder.build(); 1177 } 1178 1179 private void writerWALFile(Path file, List<FSWALEntry> entries) 1180 throws IOException, StreamLacksCapabilityException { 1181 fs.mkdirs(file.getParent()); 1182 ProtobufLogWriter writer = new ProtobufLogWriter(); 1183 writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file), 1184 StreamSlowMonitor.create(conf, "testMonitor")); 1185 for (FSWALEntry entry : entries) { 1186 writer.append(entry); 1187 } 1188 writer.sync(false); 1189 writer.close(); 1190 } 1191 1192 protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 1193 throws IOException; 1194}