001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyInt; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.spy; 029import static org.mockito.Mockito.when; 030 031import java.io.FilterInputStream; 032import java.io.IOException; 033import java.lang.reflect.Field; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collection; 038import java.util.HashSet; 039import java.util.List; 040import java.util.NavigableMap; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicInteger; 045import java.util.function.Consumer; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.ExtendedCell; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HBaseTestingUtil; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.KeyValue; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.client.RegionInfoBuilder; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.ResultScanner; 070import org.apache.hadoop.hbase.client.Scan; 071import org.apache.hadoop.hbase.client.Table; 072import org.apache.hadoop.hbase.client.TableDescriptor; 073import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 074import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 075import org.apache.hadoop.hbase.monitoring.MonitoredTask; 076import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 077import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 078import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 079import org.apache.hadoop.hbase.regionserver.FlushRequestListener; 080import org.apache.hadoop.hbase.regionserver.FlushRequester; 081import org.apache.hadoop.hbase.regionserver.HRegion; 082import org.apache.hadoop.hbase.regionserver.HRegionServer; 083import org.apache.hadoop.hbase.regionserver.HStore; 084import org.apache.hadoop.hbase.regionserver.MemStoreSizing; 085import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 086import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 087import org.apache.hadoop.hbase.regionserver.Region; 088import org.apache.hadoop.hbase.regionserver.RegionScanner; 089import org.apache.hadoop.hbase.regionserver.RegionServerServices; 090import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 091import org.apache.hadoop.hbase.security.User; 092import org.apache.hadoop.hbase.util.Bytes; 093import org.apache.hadoop.hbase.util.CommonFSUtils; 094import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 095import org.apache.hadoop.hbase.util.EnvironmentEdge; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.HFileTestUtil; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 100import org.apache.hadoop.hbase.wal.WAL; 101import org.apache.hadoop.hbase.wal.WALEdit; 102import org.apache.hadoop.hbase.wal.WALFactory; 103import org.apache.hadoop.hbase.wal.WALKeyImpl; 104import org.apache.hadoop.hbase.wal.WALSplitUtil; 105import org.apache.hadoop.hbase.wal.WALSplitter; 106import org.apache.hadoop.hbase.wal.WALStreamReader; 107import org.apache.hadoop.hdfs.DFSInputStream; 108import org.junit.After; 109import org.junit.AfterClass; 110import org.junit.Before; 111import org.junit.BeforeClass; 112import org.junit.Rule; 113import org.junit.Test; 114import org.junit.rules.TestName; 115import org.mockito.Mockito; 116import org.mockito.invocation.InvocationOnMock; 117import org.mockito.stubbing.Answer; 118import org.slf4j.Logger; 119import org.slf4j.LoggerFactory; 120 121/** 122 * Test replay of edits out of a WAL split. 123 */ 124public abstract class AbstractTestWALReplay { 125 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 126 static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 127 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 128 private Path hbaseRootDir = null; 129 private String logName; 130 private Path oldLogDir; 131 private Path logDir; 132 private FileSystem fs; 133 private Configuration conf; 134 private WALFactory wals; 135 136 @Rule 137 public final TestName currentTest = new TestName(); 138 139 @BeforeClass 140 public static void setUpBeforeClass() throws Exception { 141 Configuration conf = TEST_UTIL.getConfiguration(); 142 // The below config supported by 0.20-append and CDH3b2 143 conf.setInt("dfs.client.block.recovery.retries", 2); 144 TEST_UTIL.startMiniCluster(3); 145 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 146 LOG.info("hbase.rootdir=" + hbaseRootDir); 147 CommonFSUtils.setRootDir(conf, hbaseRootDir); 148 } 149 150 @AfterClass 151 public static void tearDownAfterClass() throws Exception { 152 TEST_UTIL.shutdownMiniCluster(); 153 } 154 155 @Before 156 public void setUp() throws Exception { 157 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 158 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 159 this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf); 160 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 161 String serverName = ServerName 162 .valueOf(currentTest.getMethodName() + "-manual", 16010, EnvironmentEdgeManager.currentTime()) 163 .toString(); 164 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 165 this.logDir = new Path(this.hbaseRootDir, logName); 166 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 167 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 168 } 169 this.wals = new WALFactory(conf, currentTest.getMethodName()); 170 } 171 172 @After 173 public void tearDown() throws Exception { 174 this.wals.close(); 175 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 176 } 177 178 /* 179 * @param p Directory to cleanup 180 */ 181 private void deleteDir(final Path p) throws IOException { 182 if (this.fs.exists(p)) { 183 if (!this.fs.delete(p, true)) { 184 throw new IOException("Failed remove of " + p); 185 } 186 } 187 } 188 189 /** 190 * */ 191 @Test 192 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { 193 final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); 194 byte[] family1 = Bytes.toBytes("cf1"); 195 byte[] family2 = Bytes.toBytes("cf2"); 196 byte[] qualifier = Bytes.toBytes("q"); 197 byte[] value = Bytes.toBytes("testV"); 198 byte[][] familys = { family1, family2 }; 199 TEST_UTIL.createTable(tableName, familys); 200 Table htable = TEST_UTIL.getConnection().getTable(tableName); 201 Put put = new Put(Bytes.toBytes("r1")); 202 put.addColumn(family1, qualifier, value); 203 htable.put(put); 204 ResultScanner resultScanner = htable.getScanner(new Scan()); 205 int count = 0; 206 while (resultScanner.next() != null) { 207 count++; 208 } 209 resultScanner.close(); 210 assertEquals(1, count); 211 212 SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); 213 List<HRegion> regions = hbaseCluster.getRegions(tableName); 214 assertEquals(1, regions.size()); 215 216 // move region to another regionserver 217 Region destRegion = regions.get(0); 218 int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName()); 219 assertTrue("Please start more than 1 regionserver", 220 hbaseCluster.getRegionServerThreads().size() > 1); 221 int destServerNum = 0; 222 while (destServerNum == originServerNum) { 223 destServerNum++; 224 } 225 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); 226 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); 227 // move region to destination regionserver 228 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName()); 229 230 // delete the row 231 Delete del = new Delete(Bytes.toBytes("r1")); 232 htable.delete(del); 233 resultScanner = htable.getScanner(new Scan()); 234 count = 0; 235 while (resultScanner.next() != null) { 236 count++; 237 } 238 resultScanner.close(); 239 assertEquals(0, count); 240 241 // flush region and make major compaction 242 HRegion region = 243 (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); 244 region.flush(true); 245 // wait to complete major compaction 246 for (HStore store : region.getStores()) { 247 store.triggerMajorCompaction(); 248 } 249 region.compact(true); 250 251 // move region to origin regionserver 252 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName()); 253 // abort the origin regionserver 254 originServer.abort("testing"); 255 256 // see what we get 257 Result result = htable.get(new Get(Bytes.toBytes("r1"))); 258 if (result != null) { 259 assertTrue("Row is deleted, but we get" + result.toString(), 260 (result == null) || result.isEmpty()); 261 } 262 resultScanner.close(); 263 } 264 265 /** 266 * Tests for hbase-2727. 267 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> 268 */ 269 @Test 270 public void test2727() throws Exception { 271 // Test being able to have > 1 set of edits in the recovered.edits directory. 272 // Ensure edits are replayed properly. 273 final TableName tableName = TableName.valueOf("test2727"); 274 275 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 276 RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 277 Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 278 deleteDir(basedir); 279 280 TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName); 281 Region region2 = 282 HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor); 283 HBaseTestingUtil.closeRegionAndWAL(region2); 284 final byte[] rowName = tableName.getName(); 285 286 WAL wal1 = createWAL(this.conf, hbaseRootDir, logName); 287 // Add 1k to each family. 288 final int countPerFamily = 1000; 289 290 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 291 for (byte[] fam : tableDescriptor.getColumnFamilyNames()) { 292 scopes.put(fam, 0); 293 } 294 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 295 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1, 296 mvcc, scopes); 297 } 298 wal1.shutdown(); 299 runWALSplit(this.conf); 300 301 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 302 // Add 1k to each family. 303 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 304 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2, 305 mvcc, scopes); 306 } 307 wal2.shutdown(); 308 runWALSplit(this.conf); 309 310 WAL wal3 = createWAL(this.conf, hbaseRootDir, logName); 311 try { 312 HRegion region = 313 HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3); 314 long seqid = region.getOpenSeqNum(); 315 // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. 316 // When opened, this region would apply 6k edits, and increment the sequenceId by 1 317 assertTrue(seqid > mvcc.getWritePoint()); 318 assertEquals(seqid - 1, mvcc.getWritePoint()); 319 LOG.debug( 320 "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint()); 321 322 // TODO: Scan all. 323 region.close(); 324 } finally { 325 wal3.close(); 326 } 327 } 328 329 /** 330 * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'. 331 */ 332 @Test 333 public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException, 334 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 335 final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); 336 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 337 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 338 deleteDir(basedir); 339 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 340 Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 341 HBaseTestingUtil.closeRegionAndWAL(region2); 342 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 343 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 344 345 byte[] family = htd.getColumnFamilies()[0].getName(); 346 Path f = new Path(basedir, "hfile"); 347 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), 348 Bytes.toBytes("z"), 10); 349 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 350 hfs.add(Pair.newPair(family, f.toString())); 351 region.bulkLoadHFiles(hfs, true, null); 352 353 // Add an edit so something in the WAL 354 byte[] row = tableName.getName(); 355 region.put((new Put(row)).addColumn(family, family, family)); 356 wal.sync(); 357 final int rowsInsertedCount = 11; 358 359 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 360 361 // Now 'crash' the region by stealing its wal 362 final Configuration newConf = HBaseConfiguration.create(this.conf); 363 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 364 user.runAs(new PrivilegedExceptionAction() { 365 @Override 366 public Object run() throws Exception { 367 runWALSplit(newConf); 368 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 369 370 HRegion region2 = 371 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 372 long seqid2 = region2.getOpenSeqNum(); 373 assertTrue(seqid2 > -1); 374 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 375 376 // I can't close wal1. Its been appropriated when we split. 377 region2.close(); 378 wal2.close(); 379 return null; 380 } 381 }); 382 } 383 384 /** 385 * HRegion test case that is made of a major compacted HFile (created with three bulk loaded 386 * files) and an edit in the memstore. 387 * <p/> 388 * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from 389 * being replayed" 390 */ 391 @Test 392 public void testCompactedBulkLoadedFiles() throws IOException, SecurityException, 393 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 394 final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles"); 395 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 396 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 397 deleteDir(basedir); 398 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 399 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 400 HBaseTestingUtil.closeRegionAndWAL(region2); 401 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 402 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 403 404 // Add an edit so something in the WAL 405 byte[] row = tableName.getName(); 406 byte[] family = htd.getColumnFamilies()[0].getName(); 407 region.put((new Put(row)).addColumn(family, family, family)); 408 wal.sync(); 409 410 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 411 for (int i = 0; i < 3; i++) { 412 Path f = new Path(basedir, "hfile" + i); 413 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), 414 Bytes.toBytes(i + "50"), 10); 415 hfs.add(Pair.newPair(family, f.toString())); 416 } 417 region.bulkLoadHFiles(hfs, true, null); 418 final int rowsInsertedCount = 31; 419 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 420 421 // major compact to turn all the bulk loaded files into one normal file 422 region.compact(true); 423 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 424 425 // Now 'crash' the region by stealing its wal 426 final Configuration newConf = HBaseConfiguration.create(this.conf); 427 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 428 user.runAs(new PrivilegedExceptionAction() { 429 @Override 430 public Object run() throws Exception { 431 runWALSplit(newConf); 432 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 433 434 HRegion region2 = 435 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 436 long seqid2 = region2.getOpenSeqNum(); 437 assertTrue(seqid2 > -1); 438 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 439 440 // I can't close wal1. Its been appropriated when we split. 441 region2.close(); 442 wal2.close(); 443 return null; 444 } 445 }); 446 } 447 448 /** 449 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 450 * seqids. 451 */ 452 @Test 453 public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException, 454 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 455 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 456 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 457 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 458 deleteDir(basedir); 459 final byte[] rowName = tableName.getName(); 460 final int countPerFamily = 10; 461 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 462 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 463 HBaseTestingUtil.closeRegionAndWAL(region3); 464 // Write countPerFamily edits into the three families. Do a flush on one 465 // of the families during the load of edits so its seqid is not same as 466 // others to test we do right thing when different seqids. 467 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 468 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 469 long seqid = region.getOpenSeqNum(); 470 boolean first = true; 471 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 472 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 473 if (first) { 474 // If first, so we have at least one family w/ different seqid to rest. 475 region.flush(true); 476 first = false; 477 } 478 } 479 // Now assert edits made it in. 480 final Get g = new Get(rowName); 481 Result result = region.get(g); 482 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 483 // Now close the region (without flush), split the log, reopen the region and assert that 484 // replay of log has the correct effect, that our seqids are calculated correctly so 485 // all edits in logs are seen as 'stale'/old. 486 region.close(true); 487 wal.shutdown(); 488 runWALSplit(this.conf); 489 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 490 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); 491 long seqid2 = region2.getOpenSeqNum(); 492 assertTrue(seqid + result.size() < seqid2); 493 final Result result1b = region2.get(g); 494 assertEquals(result.size(), result1b.size()); 495 496 // Next test. Add more edits, then 'crash' this region by stealing its wal 497 // out from under it and assert that replay of the log adds the edits back 498 // correctly when region is opened again. 499 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 500 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); 501 } 502 // Get count of edits. 503 final Result result2 = region2.get(g); 504 assertEquals(2 * result.size(), result2.size()); 505 wal2.sync(); 506 final Configuration newConf = HBaseConfiguration.create(this.conf); 507 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 508 user.runAs(new PrivilegedExceptionAction<Object>() { 509 @Override 510 public Object run() throws Exception { 511 runWALSplit(newConf); 512 FileSystem newFS = FileSystem.get(newConf); 513 // Make a new wal for new region open. 514 WAL wal3 = createWAL(newConf, hbaseRootDir, logName); 515 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); 516 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { 517 @Override 518 protected void restoreEdit(HStore s, ExtendedCell cell, MemStoreSizing memstoreSizing) { 519 super.restoreEdit(s, cell, memstoreSizing); 520 countOfRestoredEdits.incrementAndGet(); 521 } 522 }; 523 long seqid3 = region3.initialize(); 524 Result result3 = region3.get(g); 525 // Assert that count of cells is same as before crash. 526 assertEquals(result2.size(), result3.size()); 527 assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get()); 528 529 // I can't close wal1. Its been appropriated when we split. 530 region3.close(); 531 wal3.close(); 532 return null; 533 } 534 }); 535 } 536 537 /** 538 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 539 * got flushed but others did not. 540 * <p/> 541 * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is 542 * by flushing at the region level, and then deleting the recently flushed store file for one of 543 * the Stores. This would put us back in the situation where all but that store got flushed and 544 * the region died. 545 * <p/> 546 * We restart Region again, and verify that the edits were replayed. 547 */ 548 @Test 549 public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException, 550 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 551 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 552 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 553 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 554 deleteDir(basedir); 555 final byte[] rowName = tableName.getName(); 556 final int countPerFamily = 10; 557 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 558 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 559 HBaseTestingUtil.closeRegionAndWAL(region3); 560 // Write countPerFamily edits into the three families. Do a flush on one 561 // of the families during the load of edits so its seqid is not same as 562 // others to test we do right thing when different seqids. 563 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 564 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 565 long seqid = region.getOpenSeqNum(); 566 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 567 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 568 } 569 570 // Now assert edits made it in. 571 final Get g = new Get(rowName); 572 Result result = region.get(g); 573 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 574 575 // Let us flush the region 576 region.flush(true); 577 region.close(true); 578 wal.shutdown(); 579 580 // delete the store files in the second column family to simulate a failure 581 // in between the flushcache(); 582 // we have 3 families. killing the middle one ensures that taking the maximum 583 // will make us fail. 584 int cf_count = 0; 585 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 586 cf_count++; 587 if (cf_count == 2) { 588 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); 589 } 590 } 591 592 // Let us try to split and recover 593 runWALSplit(this.conf); 594 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 595 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); 596 long seqid2 = region2.getOpenSeqNum(); 597 assertTrue(seqid + result.size() < seqid2); 598 599 final Result result1b = region2.get(g); 600 assertEquals(result.size(), result1b.size()); 601 } 602 603 // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. 604 // Only throws exception if throwExceptionWhenFlushing is set true. 605 public static class CustomStoreFlusher extends DefaultStoreFlusher { 606 // Switch between throw and not throw exception in flush 607 public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); 608 609 public CustomStoreFlusher(Configuration conf, HStore store) { 610 super(conf, store); 611 } 612 613 @Override 614 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 615 MonitoredTask status, ThroughputController throughputController, 616 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 617 if (throwExceptionWhenFlushing.get()) { 618 throw new IOException("Simulated exception by tests"); 619 } 620 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 621 writerCreationTracker); 622 } 623 } 624 625 /** 626 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 627 * flush after writing some data, then writing more data and flush again, at last verify the data. 628 */ 629 @Test 630 public void testReplayEditsAfterAbortingFlush() throws IOException { 631 final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush"); 632 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 633 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 634 deleteDir(basedir); 635 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 636 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 637 HBaseTestingUtil.closeRegionAndWAL(region3); 638 // Write countPerFamily edits into the three families. Do a flush on one 639 // of the families during the load of edits so its seqid is not same as 640 // others to test we do right thing when different seqids. 641 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 642 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 643 Mockito.doReturn(false).when(rsServices).isAborted(); 644 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 645 when(rsServices.getConfiguration()).thenReturn(conf); 646 Configuration customConf = new Configuration(this.conf); 647 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 648 CustomStoreFlusher.class.getName()); 649 HRegion region = 650 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); 651 int writtenRowCount = 10; 652 List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies())); 653 for (int i = 0; i < writtenRowCount; i++) { 654 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 655 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 656 Bytes.toBytes("val")); 657 region.put(put); 658 } 659 660 // Now assert edits made it in. 661 RegionScanner scanner = region.getScanner(new Scan()); 662 assertEquals(writtenRowCount, getScannedCount(scanner)); 663 664 // Let us flush the region 665 CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 666 try { 667 region.flush(true); 668 fail("Injected exception hasn't been thrown"); 669 } catch (IOException e) { 670 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 671 // simulated to abort server 672 Mockito.doReturn(true).when(rsServices).isAborted(); 673 region.setClosing(false); // region normally does not accept writes after 674 // DroppedSnapshotException. We mock around it for this test. 675 } 676 // writing more data 677 int moreRow = 10; 678 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 679 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 680 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 681 Bytes.toBytes("val")); 682 region.put(put); 683 } 684 writtenRowCount += moreRow; 685 // call flush again 686 CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 687 try { 688 region.flush(true); 689 } catch (IOException t) { 690 LOG.info( 691 "Expected exception when flushing region because server is stopped," + t.getMessage()); 692 } 693 694 region.close(true); 695 wal.shutdown(); 696 697 // Let us try to split and recover 698 runWALSplit(this.conf); 699 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 700 Mockito.doReturn(false).when(rsServices).isAborted(); 701 HRegion region2 = 702 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); 703 scanner = region2.getScanner(new Scan()); 704 assertEquals(writtenRowCount, getScannedCount(scanner)); 705 } 706 707 private int getScannedCount(RegionScanner scanner) throws IOException { 708 int scannedCount = 0; 709 List<Cell> results = new ArrayList<>(); 710 while (true) { 711 boolean existMore = scanner.next(results); 712 if (!results.isEmpty()) { 713 scannedCount++; 714 } 715 if (!existMore) { 716 break; 717 } 718 results.clear(); 719 } 720 return scannedCount; 721 } 722 723 /** 724 * Create an HRegion with the result of a WAL split and test we only see the good edits= 725 */ 726 @Test 727 public void testReplayEditsWrittenIntoWAL() throws Exception { 728 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 729 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 730 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 731 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 732 deleteDir(basedir); 733 734 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 735 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 736 HBaseTestingUtil.closeRegionAndWAL(region2); 737 final WAL wal = createWAL(this.conf, hbaseRootDir, logName); 738 final byte[] rowName = tableName.getName(); 739 final byte[] regionName = hri.getEncodedNameAsBytes(); 740 741 // Add 1k to each family. 742 final int countPerFamily = 1000; 743 Set<byte[]> familyNames = new HashSet<>(); 744 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 745 for (byte[] fam : htd.getColumnFamilyNames()) { 746 scopes.put(fam, 0); 747 } 748 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 749 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes); 750 familyNames.add(hcd.getName()); 751 } 752 753 // Add a cache flush, shouldn't have any effect 754 wal.startCacheFlush(regionName, familyNames); 755 wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM); 756 757 // Add an edit to another family, should be skipped. 758 WALEdit edit = new WALEdit(); 759 long now = ee.currentTime(); 760 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); 761 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 762 edit); 763 764 // Delete the c family to verify deletes make it over. 765 edit = new WALEdit(); 766 now = ee.currentTime(); 767 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); 768 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 769 edit); 770 771 // Sync. 772 wal.sync(); 773 // Make a new conf and a new fs for the splitter to run on so we can take 774 // over old wal. 775 final Configuration newConf = HBaseConfiguration.create(this.conf); 776 User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime"); 777 user.runAs(new PrivilegedExceptionAction<Void>() { 778 @Override 779 public Void run() throws Exception { 780 runWALSplit(newConf); 781 FileSystem newFS = FileSystem.get(newConf); 782 // 100k seems to make for about 4 flushes during HRegion#initialize. 783 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); 784 // Make a new wal for new region. 785 WAL newWal = createWAL(newConf, hbaseRootDir, logName); 786 final AtomicInteger flushcount = new AtomicInteger(0); 787 try { 788 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { 789 @Override 790 protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, 791 final Collection<HStore> storesToFlush, MonitoredTask status, 792 boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { 793 LOG.info("InternalFlushCache Invoked"); 794 FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, 795 Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); 796 flushcount.incrementAndGet(); 797 return fs; 798 } 799 }; 800 // The seq id this region has opened up with 801 long seqid = region.initialize(); 802 803 // The mvcc readpoint of from inserting data. 804 long writePoint = mvcc.getWritePoint(); 805 806 // We flushed during init. 807 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); 808 assertTrue((seqid - 1) == writePoint); 809 810 Get get = new Get(rowName); 811 Result result = region.get(get); 812 // Make sure we only see the good edits 813 assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size()); 814 region.close(); 815 } finally { 816 newWal.close(); 817 } 818 return null; 819 } 820 }); 821 } 822 823 @Test 824 // the following test is for HBASE-6065 825 public void testSequentialEditLogSeqNum() throws IOException { 826 final TableName tableName = TableName.valueOf(currentTest.getMethodName()); 827 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 828 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 829 deleteDir(basedir); 830 final byte[] rowName = tableName.getName(); 831 final int countPerFamily = 10; 832 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 833 834 // Mock the WAL 835 MockWAL wal = createMockWAL(); 836 837 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 838 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 839 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 840 } 841 842 // Let us flush the region 843 // But this time completeflushcache is not yet done 844 region.flush(true); 845 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 846 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); 847 } 848 long lastestSeqNumber = region.getReadPoint(null); 849 // get the current seq no 850 wal.doCompleteCacheFlush = true; 851 // allow complete cache flush with the previous seq number got after first 852 // set of edits. 853 wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 854 wal.shutdown(); 855 FileStatus[] listStatus = wal.getFiles(); 856 assertNotNull(listStatus); 857 assertTrue(listStatus.length > 0); 858 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, 859 wals, null); 860 FileStatus[] listStatus1 = 861 this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName), 862 new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { 863 @Override 864 public boolean accept(Path p) { 865 return !WALSplitUtil.isSequenceIdFile(p); 866 } 867 }); 868 int editCount = 0; 869 for (FileStatus fileStatus : listStatus1) { 870 editCount = Integer.parseInt(fileStatus.getPath().getName()); 871 } 872 // The sequence number should be same 873 assertEquals( 874 "The sequence number of the recoverd.edits and the current edit seq should be same", 875 lastestSeqNumber, editCount); 876 } 877 878 /** 879 * testcase for https://issues.apache.org/jira/browse/HBASE-15252 880 */ 881 @Test 882 public void testDatalossWhenInputError() throws Exception { 883 final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); 884 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 885 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 886 deleteDir(basedir); 887 final byte[] rowName = tableName.getName(); 888 final int countPerFamily = 10; 889 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 890 HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 891 Path regionDir = region1.getWALRegionDir(); 892 HBaseTestingUtil.closeRegionAndWAL(region1); 893 894 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 895 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 896 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 897 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 898 } 899 // Now assert edits made it in. 900 final Get g = new Get(rowName); 901 Result result = region.get(g); 902 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 903 // Now close the region (without flush), split the log, reopen the region and assert that 904 // replay of log has the correct effect. 905 region.close(true); 906 wal.shutdown(); 907 908 runWALSplit(this.conf); 909 910 // here we let the DFSInputStream throw an IOException just after the WALHeader. 911 Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first(); 912 final long headerLength; 913 try (WALStreamReader reader = WALFactory.createStreamReader(fs, editFile, conf)) { 914 headerLength = reader.getPosition(); 915 } 916 FileSystem spyFs = spy(this.fs); 917 doAnswer(new Answer<FSDataInputStream>() { 918 919 @Override 920 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 921 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); 922 Field field = FilterInputStream.class.getDeclaredField("in"); 923 field.setAccessible(true); 924 final DFSInputStream in = (DFSInputStream) field.get(stream); 925 DFSInputStream spyIn = spy(in); 926 doAnswer(new Answer<Integer>() { 927 928 private long pos; 929 930 @Override 931 public Integer answer(InvocationOnMock invocation) throws Throwable { 932 if (pos >= headerLength) { 933 throw new IOException("read over limit"); 934 } 935 int b = (Integer) invocation.callRealMethod(); 936 if (b > 0) { 937 pos += b; 938 } 939 return b; 940 } 941 }).when(spyIn).read(any(byte[].class), anyInt(), anyInt()); 942 doAnswer(new Answer<Void>() { 943 944 @Override 945 public Void answer(InvocationOnMock invocation) throws Throwable { 946 invocation.callRealMethod(); 947 in.close(); 948 return null; 949 } 950 }).when(spyIn).close(); 951 field.set(stream, spyIn); 952 return stream; 953 } 954 }).when(spyFs).open(eq(editFile)); 955 956 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 957 HRegion region2; 958 try { 959 // log replay should fail due to the IOException, otherwise we may lose data. 960 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); 961 assertEquals(result.size(), region2.get(g).size()); 962 } catch (IOException e) { 963 assertEquals("read over limit", e.getMessage()); 964 } 965 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); 966 assertEquals(result.size(), region2.get(g).size()); 967 } 968 969 /** 970 * testcase for https://issues.apache.org/jira/browse/HBASE-14949. 971 */ 972 private void testNameConflictWhenSplit(boolean largeFirst) 973 throws IOException, StreamLacksCapabilityException { 974 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 975 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 976 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 977 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 978 deleteDir(basedir); 979 980 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 981 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 982 for (byte[] fam : htd.getColumnFamilyNames()) { 983 scopes.put(fam, 0); 984 } 985 HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 986 HBaseTestingUtil.closeRegionAndWAL(region); 987 final byte[] family = htd.getColumnFamilies()[0].getName(); 988 final byte[] rowName = tableName.getName(); 989 FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); 990 FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); 991 992 Path largeFile = new Path(logDir, "wal-1"); 993 Path smallFile = new Path(logDir, "wal-2"); 994 writerWALFile(largeFile, Arrays.asList(entry1, entry2)); 995 writerWALFile(smallFile, Arrays.asList(entry2)); 996 FileStatus first, second; 997 if (largeFirst) { 998 first = fs.getFileStatus(largeFile); 999 second = fs.getFileStatus(smallFile); 1000 } else { 1001 first = fs.getFileStatus(smallFile); 1002 second = fs.getFileStatus(largeFile); 1003 } 1004 WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null); 1005 WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null); 1006 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 1007 region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); 1008 assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); 1009 assertEquals(2, region.get(new Get(rowName)).size()); 1010 } 1011 1012 @Test 1013 public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { 1014 testNameConflictWhenSplit(true); 1015 } 1016 1017 @Test 1018 public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { 1019 testNameConflictWhenSplit(false); 1020 } 1021 1022 static class MockWAL extends FSHLog { 1023 boolean doCompleteCacheFlush = false; 1024 1025 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) 1026 throws IOException { 1027 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 1028 } 1029 1030 @Override 1031 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 1032 if (!doCompleteCacheFlush) { 1033 return; 1034 } 1035 super.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 1036 } 1037 } 1038 1039 private TableDescriptor createBasic1FamilyHTD(final TableName tableName) { 1040 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1041 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1042 return builder.build(); 1043 } 1044 1045 private MockWAL createMockWAL() throws IOException { 1046 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); 1047 wal.init(); 1048 // Set down maximum recovery so we dfsclient doesn't linger retrying something 1049 // long gone. 1050 HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); 1051 return wal; 1052 } 1053 1054 // Flusher used in this test. Keep count of how often we are called and 1055 // actually run the flush inside here. 1056 static class TestFlusher implements FlushRequester { 1057 private HRegion r; 1058 1059 @Override 1060 public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) { 1061 try { 1062 r.flush(false); 1063 return true; 1064 } catch (IOException e) { 1065 throw new RuntimeException("Exception flushing", e); 1066 } 1067 } 1068 1069 @Override 1070 public boolean requestFlush(HRegion region, List<byte[]> families, 1071 FlushLifeCycleTracker tracker) { 1072 return true; 1073 } 1074 1075 @Override 1076 public boolean requestDelayedFlush(HRegion region, long when) { 1077 return true; 1078 } 1079 1080 @Override 1081 public void registerFlushRequestListener(FlushRequestListener listener) { 1082 1083 } 1084 1085 @Override 1086 public boolean unregisterFlushRequestListener(FlushRequestListener listener) { 1087 return false; 1088 } 1089 1090 @Override 1091 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 1092 1093 } 1094 } 1095 1096 private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri, 1097 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { 1098 return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); 1099 } 1100 1101 private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, 1102 int index) { 1103 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); 1104 byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); 1105 WALEdit edit = new WALEdit(); 1106 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 1107 return edit; 1108 } 1109 1110 private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence, 1111 byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, 1112 int index, NavigableMap<byte[], Integer> scopes) throws IOException { 1113 FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), 1114 createWALEdit(rowName, family, ee, index), hri, true, null); 1115 entry.stampRegionSequenceId(mvcc.begin()); 1116 return entry; 1117 } 1118 1119 private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName, 1120 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 1121 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) 1122 throws IOException { 1123 for (int j = 0; j < count; j++) { 1124 wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), 1125 createWALEdit(rowName, family, ee, j)); 1126 } 1127 wal.sync(); 1128 } 1129 1130 public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, 1131 EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { 1132 List<Put> puts = new ArrayList<>(); 1133 for (int j = 0; j < count; j++) { 1134 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); 1135 Put p = new Put(rowName); 1136 p.addColumn(family, qualifier, ee.currentTime(), rowName); 1137 r.put(p); 1138 puts.add(p); 1139 } 1140 return puts; 1141 } 1142 1143 /** 1144 * Creates an HRI around an HTD that has <code>tableName</code> and three column families named 1145 * 'a','b', and 'c'. 1146 * @param tableName Name of table to use when we create HTableDescriptor. 1147 */ 1148 private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { 1149 return RegionInfoBuilder.newBuilder(tableName).build(); 1150 } 1151 1152 /** 1153 * Run the split. Verify only single split file made. 1154 * @return The single split file made 1155 */ 1156 private Path runWALSplit(final Configuration c) throws IOException { 1157 List<Path> splits = 1158 WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 1159 // Split should generate only 1 file since there's only 1 region 1160 assertEquals("splits=" + splits, 1, splits.size()); 1161 // Make sure the file exists 1162 assertTrue(fs.exists(splits.get(0))); 1163 LOG.info("Split file=" + splits.get(0)); 1164 return splits.get(0); 1165 } 1166 1167 private TableDescriptor createBasic3FamilyHTD(final TableName tableName) { 1168 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1169 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1170 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b"))); 1171 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c"))); 1172 return builder.build(); 1173 } 1174 1175 private void writerWALFile(Path file, List<FSWALEntry> entries) 1176 throws IOException, StreamLacksCapabilityException { 1177 fs.mkdirs(file.getParent()); 1178 ProtobufLogWriter writer = new ProtobufLogWriter(); 1179 writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file), 1180 StreamSlowMonitor.create(conf, "testMonitor")); 1181 for (FSWALEntry entry : entries) { 1182 writer.append(entry); 1183 } 1184 writer.sync(false); 1185 writer.close(); 1186 } 1187 1188 protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 1189 throws IOException; 1190}