001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyInt; 026import static org.mockito.ArgumentMatchers.eq; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.spy; 029import static org.mockito.Mockito.when; 030 031import java.io.FilterInputStream; 032import java.io.IOException; 033import java.lang.reflect.Field; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collection; 038import java.util.HashSet; 039import java.util.List; 040import java.util.NavigableMap; 041import java.util.Set; 042import java.util.TreeMap; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicInteger; 045import java.util.function.Consumer; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.ExtendedCell; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.HBaseTestingUtil; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.KeyValue; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 063import org.apache.hadoop.hbase.client.Delete; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.client.RegionInfoBuilder; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.ResultScanner; 070import org.apache.hadoop.hbase.client.Scan; 071import org.apache.hadoop.hbase.client.Table; 072import org.apache.hadoop.hbase.client.TableDescriptor; 073import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 074import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 075import org.apache.hadoop.hbase.monitoring.MonitoredTask; 076import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 077import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; 078import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 079import org.apache.hadoop.hbase.regionserver.FlushRequestListener; 080import org.apache.hadoop.hbase.regionserver.FlushRequester; 081import org.apache.hadoop.hbase.regionserver.HRegion; 082import org.apache.hadoop.hbase.regionserver.HRegionServer; 083import org.apache.hadoop.hbase.regionserver.HStore; 084import org.apache.hadoop.hbase.regionserver.MemStoreSizing; 085import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; 086import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 087import org.apache.hadoop.hbase.regionserver.Region; 088import org.apache.hadoop.hbase.regionserver.RegionScanner; 089import org.apache.hadoop.hbase.regionserver.RegionServerServices; 090import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 091import org.apache.hadoop.hbase.security.User; 092import org.apache.hadoop.hbase.util.Bytes; 093import org.apache.hadoop.hbase.util.CommonFSUtils; 094import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 095import org.apache.hadoop.hbase.util.EnvironmentEdge; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.HFileTestUtil; 098import org.apache.hadoop.hbase.util.Pair; 099import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 100import org.apache.hadoop.hbase.wal.WAL; 101import org.apache.hadoop.hbase.wal.WALEdit; 102import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 103import org.apache.hadoop.hbase.wal.WALFactory; 104import org.apache.hadoop.hbase.wal.WALKeyImpl; 105import org.apache.hadoop.hbase.wal.WALSplitUtil; 106import org.apache.hadoop.hbase.wal.WALSplitter; 107import org.apache.hadoop.hbase.wal.WALStreamReader; 108import org.apache.hadoop.hdfs.DFSInputStream; 109import org.junit.jupiter.api.AfterAll; 110import org.junit.jupiter.api.AfterEach; 111import org.junit.jupiter.api.BeforeEach; 112import org.junit.jupiter.api.Test; 113import org.junit.jupiter.api.TestInfo; 114import org.mockito.Mockito; 115import org.mockito.invocation.InvocationOnMock; 116import org.mockito.stubbing.Answer; 117import org.slf4j.Logger; 118import org.slf4j.LoggerFactory; 119 120/** 121 * Test replay of edits out of a WAL split. 122 */ 123public abstract class AbstractTestWALReplay { 124 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class); 125 static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 126 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); 127 private Path hbaseRootDir = null; 128 private String logName; 129 private Path oldLogDir; 130 private Path logDir; 131 private FileSystem fs; 132 private Configuration conf; 133 private WALFactory wals; 134 135 private String currentTest; 136 137 public static void setUpBeforeClass() throws Exception { 138 Configuration conf = TEST_UTIL.getConfiguration(); 139 // The below config supported by 0.20-append and CDH3b2 140 conf.setInt("dfs.client.block.recovery.retries", 2); 141 TEST_UTIL.startMiniCluster(3); 142 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); 143 LOG.info("hbase.rootdir=" + hbaseRootDir); 144 CommonFSUtils.setRootDir(conf, hbaseRootDir); 145 } 146 147 @AfterAll 148 public static void tearDownAfterClass() throws Exception { 149 TEST_UTIL.shutdownMiniCluster(); 150 } 151 152 @BeforeEach 153 public void setUp(TestInfo testInfo) throws Exception { 154 currentTest = testInfo.getTestMethod().get().getName(); 155 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 156 this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); 157 this.hbaseRootDir = CommonFSUtils.getRootDir(this.conf); 158 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 159 String serverName = ServerName 160 .valueOf(currentTest + "-manual", 16010, EnvironmentEdgeManager.currentTime()).toString(); 161 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 162 this.logDir = new Path(this.hbaseRootDir, logName); 163 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { 164 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 165 } 166 this.wals = new WALFactory(conf, currentTest); 167 } 168 169 @AfterEach 170 public void tearDown() throws Exception { 171 this.wals.close(); 172 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); 173 } 174 175 /* 176 * @param p Directory to cleanup 177 */ 178 private void deleteDir(final Path p) throws IOException { 179 if (this.fs.exists(p)) { 180 if (!this.fs.delete(p, true)) { 181 throw new IOException("Failed remove of " + p); 182 } 183 } 184 } 185 186 /** 187 * */ 188 @Test 189 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { 190 final TableName tableName = TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF"); 191 byte[] family1 = Bytes.toBytes("cf1"); 192 byte[] family2 = Bytes.toBytes("cf2"); 193 byte[] qualifier = Bytes.toBytes("q"); 194 byte[] value = Bytes.toBytes("testV"); 195 byte[][] familys = { family1, family2 }; 196 TEST_UTIL.createTable(tableName, familys); 197 Table htable = TEST_UTIL.getConnection().getTable(tableName); 198 Put put = new Put(Bytes.toBytes("r1")); 199 put.addColumn(family1, qualifier, value); 200 htable.put(put); 201 ResultScanner resultScanner = htable.getScanner(new Scan()); 202 int count = 0; 203 while (resultScanner.next() != null) { 204 count++; 205 } 206 resultScanner.close(); 207 assertEquals(1, count); 208 209 SingleProcessHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); 210 List<HRegion> regions = hbaseCluster.getRegions(tableName); 211 assertEquals(1, regions.size()); 212 213 // move region to another regionserver 214 Region destRegion = regions.get(0); 215 int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName()); 216 assertTrue(hbaseCluster.getRegionServerThreads().size() > 1, 217 "Please start more than 1 regionserver"); 218 int destServerNum = 0; 219 while (destServerNum == originServerNum) { 220 destServerNum++; 221 } 222 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); 223 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); 224 // move region to destination regionserver 225 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), destServer.getServerName()); 226 227 // delete the row 228 Delete del = new Delete(Bytes.toBytes("r1")); 229 htable.delete(del); 230 resultScanner = htable.getScanner(new Scan()); 231 count = 0; 232 while (resultScanner.next() != null) { 233 count++; 234 } 235 resultScanner.close(); 236 assertEquals(0, count); 237 238 // flush region and make major compaction 239 HRegion region = 240 (HRegion) destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName()); 241 region.flush(true); 242 // wait to complete major compaction 243 for (HStore store : region.getStores()) { 244 store.triggerMajorCompaction(); 245 } 246 region.compact(true); 247 248 // move region to origin regionserver 249 TEST_UTIL.moveRegionAndWait(destRegion.getRegionInfo(), originServer.getServerName()); 250 // abort the origin regionserver 251 originServer.abort("testing"); 252 253 // see what we get 254 Result result = htable.get(new Get(Bytes.toBytes("r1"))); 255 if (result != null) { 256 assertTrue((result == null) || result.isEmpty(), 257 "Row is deleted, but we get" + result.toString()); 258 } 259 resultScanner.close(); 260 } 261 262 /** 263 * Tests for hbase-2727. 264 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a> 265 */ 266 @Test 267 public void test2727() throws Exception { 268 // Test being able to have > 1 set of edits in the recovered.edits directory. 269 // Ensure edits are replayed properly. 270 final TableName tableName = TableName.valueOf("test2727"); 271 272 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 273 RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 274 Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 275 deleteDir(basedir); 276 277 TableDescriptor tableDescriptor = createBasic3FamilyHTD(tableName); 278 Region region2 = 279 HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, tableDescriptor); 280 HBaseTestingUtil.closeRegionAndWAL(region2); 281 final byte[] rowName = tableName.getName(); 282 283 WAL wal1 = createWAL(this.conf, hbaseRootDir, logName); 284 // Add 1k to each family. 285 final int countPerFamily = 1000; 286 287 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 288 for (byte[] fam : tableDescriptor.getColumnFamilyNames()) { 289 scopes.put(fam, 0); 290 } 291 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 292 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal1, 293 mvcc, scopes); 294 } 295 wal1.shutdown(); 296 runWALSplit(this.conf); 297 298 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 299 // Add 1k to each family. 300 for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { 301 addWALEdits(tableName, hri, rowName, familyDescriptor.getName(), countPerFamily, ee, wal2, 302 mvcc, scopes); 303 } 304 wal2.shutdown(); 305 runWALSplit(this.conf); 306 307 WAL wal3 = createWAL(this.conf, hbaseRootDir, logName); 308 try { 309 HRegion region = 310 HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, tableDescriptor, wal3); 311 long seqid = region.getOpenSeqNum(); 312 // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1. 313 // When opened, this region would apply 6k edits, and increment the sequenceId by 1 314 assertTrue(seqid > mvcc.getWritePoint()); 315 assertEquals(seqid - 1, mvcc.getWritePoint()); 316 LOG.debug( 317 "region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: " + mvcc.getReadPoint()); 318 319 // TODO: Scan all. 320 region.close(); 321 } finally { 322 wal3.close(); 323 } 324 } 325 326 /** 327 * Test case of HRegion that is only made out of bulk loaded files. Assert that we don't 'crash'. 328 */ 329 @Test 330 public void testRegionMadeOfBulkLoadedFilesOnly() throws IOException, SecurityException, 331 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 332 final TableName tableName = TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly"); 333 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 334 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 335 deleteDir(basedir); 336 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 337 Region region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 338 HBaseTestingUtil.closeRegionAndWAL(region2); 339 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 340 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 341 342 byte[] family = htd.getColumnFamilies()[0].getName(); 343 Path f = new Path(basedir, "hfile"); 344 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""), 345 Bytes.toBytes("z"), 10); 346 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 347 hfs.add(Pair.newPair(family, f.toString())); 348 region.bulkLoadHFiles(hfs, true, null); 349 350 // Add an edit so something in the WAL 351 byte[] row = tableName.getName(); 352 region.put((new Put(row)).addColumn(family, family, family)); 353 wal.sync(); 354 final int rowsInsertedCount = 11; 355 356 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 357 358 // Now 'crash' the region by stealing its wal 359 final Configuration newConf = HBaseConfiguration.create(this.conf); 360 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 361 user.runAs(new PrivilegedExceptionAction() { 362 @Override 363 public Object run() throws Exception { 364 runWALSplit(newConf); 365 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 366 367 HRegion region2 = 368 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 369 long seqid2 = region2.getOpenSeqNum(); 370 assertTrue(seqid2 > -1); 371 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 372 373 // I can't close wal1. Its been appropriated when we split. 374 region2.close(); 375 wal2.close(); 376 return null; 377 } 378 }); 379 } 380 381 /** 382 * HRegion test case that is made of a major compacted HFile (created with three bulk loaded 383 * files) and an edit in the memstore. 384 * <p/> 385 * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries from 386 * being replayed" 387 */ 388 @Test 389 public void testCompactedBulkLoadedFiles() throws IOException, SecurityException, 390 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 391 final TableName tableName = TableName.valueOf("testCompactedBulkLoadedFiles"); 392 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 393 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString()); 394 deleteDir(basedir); 395 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 396 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 397 HBaseTestingUtil.closeRegionAndWAL(region2); 398 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 399 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); 400 401 // Add an edit so something in the WAL 402 byte[] row = tableName.getName(); 403 byte[] family = htd.getColumnFamilies()[0].getName(); 404 region.put((new Put(row)).addColumn(family, family, family)); 405 wal.sync(); 406 407 List<Pair<byte[], String>> hfs = new ArrayList<>(1); 408 for (int i = 0; i < 3; i++) { 409 Path f = new Path(basedir, "hfile" + i); 410 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"), 411 Bytes.toBytes(i + "50"), 10); 412 hfs.add(Pair.newPair(family, f.toString())); 413 } 414 region.bulkLoadHFiles(hfs, true, null); 415 final int rowsInsertedCount = 31; 416 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 417 418 // major compact to turn all the bulk loaded files into one normal file 419 region.compact(true); 420 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan()))); 421 422 // Now 'crash' the region by stealing its wal 423 final Configuration newConf = HBaseConfiguration.create(this.conf); 424 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 425 user.runAs(new PrivilegedExceptionAction() { 426 @Override 427 public Object run() throws Exception { 428 runWALSplit(newConf); 429 WAL wal2 = createWAL(newConf, hbaseRootDir, logName); 430 431 HRegion region2 = 432 HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2); 433 long seqid2 = region2.getOpenSeqNum(); 434 assertTrue(seqid2 > -1); 435 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan()))); 436 437 // I can't close wal1. Its been appropriated when we split. 438 region2.close(); 439 wal2.close(); 440 return null; 441 } 442 }); 443 } 444 445 /** 446 * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify 447 * seqids. 448 */ 449 @Test 450 public void testReplayEditsWrittenViaHRegion() throws IOException, SecurityException, 451 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 452 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 453 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 454 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 455 deleteDir(basedir); 456 final byte[] rowName = tableName.getName(); 457 final int countPerFamily = 10; 458 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 459 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 460 HBaseTestingUtil.closeRegionAndWAL(region3); 461 // Write countPerFamily edits into the three families. Do a flush on one 462 // of the families during the load of edits so its seqid is not same as 463 // others to test we do right thing when different seqids. 464 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 465 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 466 long seqid = region.getOpenSeqNum(); 467 boolean first = true; 468 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 469 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 470 if (first) { 471 // If first, so we have at least one family w/ different seqid to rest. 472 region.flush(true); 473 first = false; 474 } 475 } 476 // Now assert edits made it in. 477 final Get g = new Get(rowName); 478 Result result = region.get(g); 479 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 480 // Now close the region (without flush), split the log, reopen the region and assert that 481 // replay of log has the correct effect, that our seqids are calculated correctly so 482 // all edits in logs are seen as 'stale'/old. 483 region.close(true); 484 wal.shutdown(); 485 runWALSplit(this.conf); 486 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 487 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); 488 long seqid2 = region2.getOpenSeqNum(); 489 assertTrue(seqid + result.size() < seqid2); 490 final Result result1b = region2.get(g); 491 assertEquals(result.size(), result1b.size()); 492 493 // Next test. Add more edits, then 'crash' this region by stealing its wal 494 // out from under it and assert that replay of the log adds the edits back 495 // correctly when region is opened again. 496 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 497 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); 498 } 499 // Get count of edits. 500 final Result result2 = region2.get(g); 501 assertEquals(2 * result.size(), result2.size()); 502 wal2.sync(); 503 final Configuration newConf = HBaseConfiguration.create(this.conf); 504 User user = HBaseTestingUtil.getDifferentUser(newConf, tableName.getNameAsString()); 505 user.runAs(new PrivilegedExceptionAction<Object>() { 506 @Override 507 public Object run() throws Exception { 508 runWALSplit(newConf); 509 FileSystem newFS = FileSystem.get(newConf); 510 // Make a new wal for new region open. 511 WAL wal3 = createWAL(newConf, hbaseRootDir, logName); 512 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); 513 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { 514 @Override 515 protected void restoreEdit(HStore s, ExtendedCell cell, MemStoreSizing memstoreSizing) { 516 super.restoreEdit(s, cell, memstoreSizing); 517 countOfRestoredEdits.incrementAndGet(); 518 } 519 }; 520 long seqid3 = region3.initialize(); 521 Result result3 = region3.get(g); 522 // Assert that count of cells is same as before crash. 523 assertEquals(result2.size(), result3.size()); 524 assertEquals(htd.getColumnFamilies().length * countPerFamily, countOfRestoredEdits.get()); 525 526 // I can't close wal1. Its been appropriated when we split. 527 region3.close(); 528 wal3.close(); 529 return null; 530 } 531 }); 532 } 533 534 /** 535 * Test that we recover correctly when there is a failure in between the flushes. i.e. Some stores 536 * got flushed but others did not. 537 * <p/> 538 * Unfortunately, there is no easy hook to flush at a store level. The way we get around this is 539 * by flushing at the region level, and then deleting the recently flushed store file for one of 540 * the Stores. This would put us back in the situation where all but that store got flushed and 541 * the region died. 542 * <p/> 543 * We restart Region again, and verify that the edits were replayed. 544 */ 545 @Test 546 public void testReplayEditsAfterPartialFlush() throws IOException, SecurityException, 547 IllegalArgumentException, NoSuchFieldException, IllegalAccessException, InterruptedException { 548 final TableName tableName = TableName.valueOf("testReplayEditsWrittenViaHRegion"); 549 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 550 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 551 deleteDir(basedir); 552 final byte[] rowName = tableName.getName(); 553 final int countPerFamily = 10; 554 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 555 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 556 HBaseTestingUtil.closeRegionAndWAL(region3); 557 // Write countPerFamily edits into the three families. Do a flush on one 558 // of the families during the load of edits so its seqid is not same as 559 // others to test we do right thing when different seqids. 560 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 561 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 562 long seqid = region.getOpenSeqNum(); 563 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 564 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 565 } 566 567 // Now assert edits made it in. 568 final Get g = new Get(rowName); 569 Result result = region.get(g); 570 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 571 572 // Let us flush the region 573 region.flush(true); 574 region.close(true); 575 wal.shutdown(); 576 577 // delete the store files in the second column family to simulate a failure 578 // in between the flushcache(); 579 // we have 3 families. killing the middle one ensures that taking the maximum 580 // will make us fail. 581 int cf_count = 0; 582 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 583 cf_count++; 584 if (cf_count == 2) { 585 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString()); 586 } 587 } 588 589 // Let us try to split and recover 590 runWALSplit(this.conf); 591 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 592 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); 593 long seqid2 = region2.getOpenSeqNum(); 594 assertTrue(seqid + result.size() < seqid2); 595 596 final Result result1b = region2.get(g); 597 assertEquals(result.size(), result1b.size()); 598 } 599 600 // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. 601 // Only throws exception if throwExceptionWhenFlushing is set true. 602 public static class CustomStoreFlusher extends DefaultStoreFlusher { 603 // Switch between throw and not throw exception in flush 604 public static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); 605 606 public CustomStoreFlusher(Configuration conf, HStore store) { 607 super(conf, store); 608 } 609 610 @Override 611 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 612 MonitoredTask status, ThroughputController throughputController, 613 FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException { 614 if (throwExceptionWhenFlushing.get()) { 615 throw new IOException("Simulated exception by tests"); 616 } 617 return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, 618 writerCreationTracker); 619 } 620 } 621 622 /** 623 * Test that we could recover the data correctly after aborting flush. In the test, first we abort 624 * flush after writing some data, then writing more data and flush again, at last verify the data. 625 */ 626 @Test 627 public void testReplayEditsAfterAbortingFlush() throws IOException { 628 final TableName tableName = TableName.valueOf("testReplayEditsAfterAbortingFlush"); 629 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 630 final Path basedir = CommonFSUtils.getTableDir(this.hbaseRootDir, tableName); 631 deleteDir(basedir); 632 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 633 HRegion region3 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 634 HBaseTestingUtil.closeRegionAndWAL(region3); 635 // Write countPerFamily edits into the three families. Do a flush on one 636 // of the families during the load of edits so its seqid is not same as 637 // others to test we do right thing when different seqids. 638 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 639 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); 640 Mockito.doReturn(false).when(rsServices).isAborted(); 641 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); 642 when(rsServices.getConfiguration()).thenReturn(conf); 643 Configuration customConf = new Configuration(this.conf); 644 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, 645 CustomStoreFlusher.class.getName()); 646 HRegion region = 647 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null); 648 int writtenRowCount = 10; 649 List<ColumnFamilyDescriptor> families = Arrays.asList((htd.getColumnFamilies())); 650 for (int i = 0; i < writtenRowCount; i++) { 651 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 652 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 653 Bytes.toBytes("val")); 654 region.put(put); 655 } 656 657 // Now assert edits made it in. 658 RegionScanner scanner = region.getScanner(new Scan()); 659 assertEquals(writtenRowCount, getScannedCount(scanner)); 660 661 // Let us flush the region 662 CustomStoreFlusher.throwExceptionWhenFlushing.set(true); 663 try { 664 region.flush(true); 665 fail("Injected exception hasn't been thrown"); 666 } catch (IOException e) { 667 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage()); 668 // simulated to abort server 669 Mockito.doReturn(true).when(rsServices).isAborted(); 670 region.setClosing(false); // region normally does not accept writes after 671 // DroppedSnapshotException. We mock around it for this test. 672 } 673 // writing more data 674 int moreRow = 10; 675 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { 676 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); 677 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), 678 Bytes.toBytes("val")); 679 region.put(put); 680 } 681 writtenRowCount += moreRow; 682 // call flush again 683 CustomStoreFlusher.throwExceptionWhenFlushing.set(false); 684 try { 685 region.flush(true); 686 } catch (IOException t) { 687 LOG.info( 688 "Expected exception when flushing region because server is stopped," + t.getMessage()); 689 } 690 691 region.close(true); 692 wal.shutdown(); 693 694 // Let us try to split and recover 695 runWALSplit(this.conf); 696 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 697 Mockito.doReturn(false).when(rsServices).isAborted(); 698 HRegion region2 = 699 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null); 700 scanner = region2.getScanner(new Scan()); 701 assertEquals(writtenRowCount, getScannedCount(scanner)); 702 } 703 704 private int getScannedCount(RegionScanner scanner) throws IOException { 705 int scannedCount = 0; 706 List<Cell> results = new ArrayList<>(); 707 while (true) { 708 boolean existMore = scanner.next(results); 709 if (!results.isEmpty()) { 710 scannedCount++; 711 } 712 if (!existMore) { 713 break; 714 } 715 results.clear(); 716 } 717 return scannedCount; 718 } 719 720 /** 721 * Create an HRegion with the result of a WAL split and test we only see the good edits= 722 */ 723 @Test 724 public void testReplayEditsWrittenIntoWAL() throws Exception { 725 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 726 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 727 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 728 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 729 deleteDir(basedir); 730 731 final TableDescriptor htd = createBasic3FamilyHTD(tableName); 732 HRegion region2 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 733 HBaseTestingUtil.closeRegionAndWAL(region2); 734 final WAL wal = createWAL(this.conf, hbaseRootDir, logName); 735 final byte[] rowName = tableName.getName(); 736 final byte[] regionName = hri.getEncodedNameAsBytes(); 737 738 // Add 1k to each family. 739 final int countPerFamily = 1000; 740 Set<byte[]> familyNames = new HashSet<>(); 741 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 742 for (byte[] fam : htd.getColumnFamilyNames()) { 743 scopes.put(fam, 0); 744 } 745 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 746 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, mvcc, scopes); 747 familyNames.add(hcd.getName()); 748 } 749 750 // Add a cache flush, shouldn't have any effect 751 wal.startCacheFlush(regionName, familyNames); 752 wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM); 753 754 // Add an edit to another family, should be skipped. 755 WALEdit edit = new WALEdit(); 756 long now = ee.currentTime(); 757 WALEditInternalHelper.addExtendedCell(edit, 758 new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); 759 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 760 edit); 761 762 // Delete the c family to verify deletes make it over. 763 edit = new WALEdit(); 764 now = ee.currentTime(); 765 WALEditInternalHelper.addExtendedCell(edit, 766 new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); 767 wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), 768 edit); 769 770 // Sync. 771 wal.sync(); 772 // Make a new conf and a new fs for the splitter to run on so we can take 773 // over old wal. 774 final Configuration newConf = HBaseConfiguration.create(this.conf); 775 User user = HBaseTestingUtil.getDifferentUser(newConf, ".replay.wal.secondtime"); 776 user.runAs(new PrivilegedExceptionAction<Void>() { 777 @Override 778 public Void run() throws Exception { 779 runWALSplit(newConf); 780 FileSystem newFS = FileSystem.get(newConf); 781 // 100k seems to make for about 4 flushes during HRegion#initialize. 782 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100); 783 // Make a new wal for new region. 784 WAL newWal = createWAL(newConf, hbaseRootDir, logName); 785 final AtomicInteger flushcount = new AtomicInteger(0); 786 try { 787 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { 788 @Override 789 protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, 790 final Collection<HStore> storesToFlush, MonitoredTask status, 791 boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { 792 LOG.info("InternalFlushCache Invoked"); 793 FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, 794 Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); 795 flushcount.incrementAndGet(); 796 return fs; 797 } 798 }; 799 // The seq id this region has opened up with 800 long seqid = region.initialize(); 801 802 // The mvcc readpoint of from inserting data. 803 long writePoint = mvcc.getWritePoint(); 804 805 // We flushed during init. 806 assertTrue(flushcount.get() > 0, "Flushcount=" + flushcount.get()); 807 assertTrue((seqid - 1) == writePoint); 808 809 Get get = new Get(rowName); 810 Result result = region.get(get); 811 // Make sure we only see the good edits 812 assertEquals(countPerFamily * (htd.getColumnFamilies().length - 1), result.size()); 813 region.close(); 814 } finally { 815 newWal.close(); 816 } 817 return null; 818 } 819 }); 820 } 821 822 @Test 823 // the following test is for HBASE-6065 824 public void testSequentialEditLogSeqNum() throws IOException { 825 final TableName tableName = TableName.valueOf(currentTest); 826 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 827 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 828 deleteDir(basedir); 829 final byte[] rowName = tableName.getName(); 830 final int countPerFamily = 10; 831 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 832 833 // Mock the WAL 834 MockWAL wal = createMockWAL(); 835 TEST_UTIL.createRegionDir(hri, 836 TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem()); 837 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 838 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 839 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 840 } 841 842 // Let us flush the region 843 // But this time completeflushcache is not yet done 844 region.flush(true); 845 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 846 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); 847 } 848 long lastestSeqNumber = region.getReadPoint(null); 849 // get the current seq no 850 wal.doCompleteCacheFlush = true; 851 // allow complete cache flush with the previous seq number got after first 852 // set of edits. 853 wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); 854 wal.shutdown(); 855 FileStatus[] listStatus = wal.getFiles(); 856 assertNotNull(listStatus); 857 assertTrue(listStatus.length > 0); 858 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, 859 wals, null); 860 FileStatus[] listStatus1 = 861 this.fs.listStatus(new Path(CommonFSUtils.getWALTableDir(conf, tableName), 862 new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { 863 @Override 864 public boolean accept(Path p) { 865 return !WALSplitUtil.isSequenceIdFile(p); 866 } 867 }); 868 int editCount = 0; 869 for (FileStatus fileStatus : listStatus1) { 870 editCount = Integer.parseInt(fileStatus.getPath().getName()); 871 } 872 // The sequence number should be same 873 assertEquals(lastestSeqNumber, editCount, 874 "The sequence number of the recoverd.edits and the current edit seq should be same"); 875 } 876 877 /** 878 * testcase for https://issues.apache.org/jira/browse/HBASE-15252 879 */ 880 @Test 881 public void testDatalossWhenInputError() throws Exception { 882 final TableName tableName = TableName.valueOf("testDatalossWhenInputError"); 883 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 884 final Path basedir = CommonFSUtils.getWALTableDir(conf, tableName); 885 deleteDir(basedir); 886 final byte[] rowName = tableName.getName(); 887 final int countPerFamily = 10; 888 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 889 HRegion region1 = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 890 Path regionDir = region1.getWALRegionDir(); 891 HBaseTestingUtil.closeRegionAndWAL(region1); 892 893 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 894 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); 895 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 896 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); 897 } 898 // Now assert edits made it in. 899 final Get g = new Get(rowName); 900 Result result = region.get(g); 901 assertEquals(countPerFamily * htd.getColumnFamilies().length, result.size()); 902 // Now close the region (without flush), split the log, reopen the region and assert that 903 // replay of log has the correct effect. 904 region.close(true); 905 wal.shutdown(); 906 907 runWALSplit(this.conf); 908 909 // here we let the DFSInputStream throw an IOException just after the WALHeader. 910 Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first(); 911 final long headerLength; 912 try (WALStreamReader reader = WALFactory.createStreamReader(fs, editFile, conf)) { 913 headerLength = reader.getPosition(); 914 } 915 FileSystem spyFs = spy(this.fs); 916 doAnswer(new Answer<FSDataInputStream>() { 917 918 @Override 919 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable { 920 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod(); 921 Field field = FilterInputStream.class.getDeclaredField("in"); 922 field.setAccessible(true); 923 final DFSInputStream in = (DFSInputStream) field.get(stream); 924 DFSInputStream spyIn = spy(in); 925 doAnswer(new Answer<Integer>() { 926 927 private long pos; 928 929 @Override 930 public Integer answer(InvocationOnMock invocation) throws Throwable { 931 if (pos >= headerLength) { 932 throw new IOException("read over limit"); 933 } 934 int b = (Integer) invocation.callRealMethod(); 935 if (b > 0) { 936 pos += b; 937 } 938 return b; 939 } 940 }).when(spyIn).read(any(byte[].class), anyInt(), anyInt()); 941 doAnswer(new Answer<Void>() { 942 943 @Override 944 public Void answer(InvocationOnMock invocation) throws Throwable { 945 invocation.callRealMethod(); 946 in.close(); 947 return null; 948 } 949 }).when(spyIn).close(); 950 field.set(stream, spyIn); 951 return stream; 952 } 953 }).when(spyFs).open(eq(editFile)); 954 955 WAL wal2 = createWAL(this.conf, hbaseRootDir, logName); 956 HRegion region2; 957 try { 958 // log replay should fail due to the IOException, otherwise we may lose data. 959 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2); 960 assertEquals(result.size(), region2.get(g).size()); 961 } catch (IOException e) { 962 assertEquals("read over limit", e.getMessage()); 963 } 964 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2); 965 assertEquals(result.size(), region2.get(g).size()); 966 } 967 968 /** 969 * testcase for https://issues.apache.org/jira/browse/HBASE-14949. 970 */ 971 private void testNameConflictWhenSplit(boolean largeFirst) 972 throws IOException, StreamLacksCapabilityException { 973 final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); 974 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 975 final RegionInfo hri = createBasic3FamilyHRegionInfo(tableName); 976 final Path basedir = CommonFSUtils.getTableDir(hbaseRootDir, tableName); 977 deleteDir(basedir); 978 979 final TableDescriptor htd = createBasic1FamilyHTD(tableName); 980 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 981 for (byte[] fam : htd.getColumnFamilyNames()) { 982 scopes.put(fam, 0); 983 } 984 HRegion region = HBaseTestingUtil.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); 985 HBaseTestingUtil.closeRegionAndWAL(region); 986 final byte[] family = htd.getColumnFamilies()[0].getName(); 987 final byte[] rowName = tableName.getName(); 988 FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); 989 FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); 990 991 Path largeFile = new Path(logDir, "wal-1"); 992 Path smallFile = new Path(logDir, "wal-2"); 993 writerWALFile(largeFile, Arrays.asList(entry1, entry2)); 994 writerWALFile(smallFile, Arrays.asList(entry2)); 995 FileStatus first, second; 996 if (largeFirst) { 997 first = fs.getFileStatus(largeFile); 998 second = fs.getFileStatus(smallFile); 999 } else { 1000 first = fs.getFileStatus(smallFile); 1001 second = fs.getFileStatus(largeFile); 1002 } 1003 WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null); 1004 WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null); 1005 WAL wal = createWAL(this.conf, hbaseRootDir, logName); 1006 region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); 1007 assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); 1008 assertEquals(2, region.get(new Get(rowName)).size()); 1009 } 1010 1011 @Test 1012 public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException { 1013 testNameConflictWhenSplit(true); 1014 } 1015 1016 @Test 1017 public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException { 1018 testNameConflictWhenSplit(false); 1019 } 1020 1021 static class MockWAL extends FSHLog { 1022 boolean doCompleteCacheFlush = false; 1023 1024 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) 1025 throws IOException { 1026 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 1027 } 1028 1029 @Override 1030 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 1031 if (!doCompleteCacheFlush) { 1032 return; 1033 } 1034 super.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 1035 } 1036 } 1037 1038 private TableDescriptor createBasic1FamilyHTD(final TableName tableName) { 1039 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1040 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1041 return builder.build(); 1042 } 1043 1044 private MockWAL createMockWAL() throws IOException { 1045 fs.mkdirs(new Path(hbaseRootDir, logName)); 1046 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); 1047 wal.init(); 1048 // Set down maximum recovery so we dfsclient doesn't linger retrying something 1049 // long gone. 1050 HBaseTestingUtil.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); 1051 return wal; 1052 } 1053 1054 // Flusher used in this test. Keep count of how often we are called and 1055 // actually run the flush inside here. 1056 static class TestFlusher implements FlushRequester { 1057 private HRegion r; 1058 1059 @Override 1060 public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) { 1061 try { 1062 r.flush(false); 1063 return true; 1064 } catch (IOException e) { 1065 throw new RuntimeException("Exception flushing", e); 1066 } 1067 } 1068 1069 @Override 1070 public boolean requestFlush(HRegion region, List<byte[]> families, 1071 FlushLifeCycleTracker tracker) { 1072 return true; 1073 } 1074 1075 @Override 1076 public boolean requestDelayedFlush(HRegion region, long when) { 1077 return true; 1078 } 1079 1080 @Override 1081 public void registerFlushRequestListener(FlushRequestListener listener) { 1082 1083 } 1084 1085 @Override 1086 public boolean unregisterFlushRequestListener(FlushRequestListener listener) { 1087 return false; 1088 } 1089 1090 @Override 1091 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 1092 1093 } 1094 } 1095 1096 private WALKeyImpl createWALKey(final TableName tableName, final RegionInfo hri, 1097 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) { 1098 return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); 1099 } 1100 1101 private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, 1102 int index) { 1103 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); 1104 byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); 1105 WALEdit edit = new WALEdit(); 1106 WALEditInternalHelper.addExtendedCell(edit, 1107 new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); 1108 return edit; 1109 } 1110 1111 private FSWALEntry createFSWALEntry(TableDescriptor htd, RegionInfo hri, long sequence, 1112 byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, 1113 int index, NavigableMap<byte[], Integer> scopes) throws IOException { 1114 FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), 1115 createWALEdit(rowName, family, ee, index), hri, true, null); 1116 entry.stampRegionSequenceId(mvcc.begin()); 1117 return entry; 1118 } 1119 1120 private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName, 1121 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, 1122 final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) 1123 throws IOException { 1124 for (int j = 0; j < count; j++) { 1125 wal.appendData(hri, createWALKey(tableName, hri, mvcc, scopes), 1126 createWALEdit(rowName, family, ee, j)); 1127 } 1128 wal.sync(); 1129 } 1130 1131 public static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count, 1132 EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { 1133 List<Put> puts = new ArrayList<>(); 1134 for (int j = 0; j < count; j++) { 1135 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); 1136 Put p = new Put(rowName); 1137 p.addColumn(family, qualifier, ee.currentTime(), rowName); 1138 r.put(p); 1139 puts.add(p); 1140 } 1141 return puts; 1142 } 1143 1144 /** 1145 * Creates an HRI around an HTD that has <code>tableName</code> and three column families named 1146 * 'a','b', and 'c'. 1147 * @param tableName Name of table to use when we create HTableDescriptor. 1148 */ 1149 private RegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) { 1150 return RegionInfoBuilder.newBuilder(tableName).build(); 1151 } 1152 1153 /** 1154 * Run the split. Verify only single split file made. 1155 * @return The single split file made 1156 */ 1157 private Path runWALSplit(final Configuration c) throws IOException { 1158 List<Path> splits = 1159 WALSplitter.split(hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); 1160 // Split should generate only 1 file since there's only 1 region 1161 assertEquals(1, splits.size(), "splits=" + splits); 1162 // Make sure the file exists 1163 assertTrue(fs.exists(splits.get(0))); 1164 LOG.info("Split file=" + splits.get(0)); 1165 return splits.get(0); 1166 } 1167 1168 private TableDescriptor createBasic3FamilyHTD(final TableName tableName) { 1169 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1170 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("a"))); 1171 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("b"))); 1172 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("c"))); 1173 return builder.build(); 1174 } 1175 1176 private void writerWALFile(Path file, List<FSWALEntry> entries) 1177 throws IOException, StreamLacksCapabilityException { 1178 fs.mkdirs(file.getParent()); 1179 ProtobufLogWriter writer = new ProtobufLogWriter(); 1180 writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file), 1181 StreamSlowMonitor.create(conf, "testMonitor")); 1182 for (FSWALEntry entry : entries) { 1183 writer.append(entry); 1184 } 1185 writer.sync(false); 1186 writer.close(); 1187 } 1188 1189 protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 1190 throws IOException; 1191}