001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.FileNotFoundException; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.Map; 041import java.util.Objects; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellBuilderType; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 049import org.apache.hadoop.hbase.HBaseClassTestRule; 050import org.apache.hadoop.hbase.HBaseTestingUtil; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.KeyValue; 053import org.apache.hadoop.hbase.ServerName; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Durability; 057import org.apache.hadoop.hbase.client.Get; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.client.RegionInfoBuilder; 061import org.apache.hadoop.hbase.client.TableDescriptor; 062import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 063import org.apache.hadoop.hbase.executor.ExecutorService; 064import org.apache.hadoop.hbase.executor.ExecutorType; 065import org.apache.hadoop.hbase.io.hfile.HFile; 066import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 067import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 068import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 069import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 070import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 071import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 072import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 073import org.apache.hadoop.hbase.testclassification.LargeTests; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 077import org.apache.hadoop.hbase.util.FSUtils; 078import org.apache.hadoop.hbase.util.Pair; 079import org.apache.hadoop.hbase.util.Strings; 080import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 081import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 082import org.apache.hadoop.hbase.wal.WAL; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALFactory; 085import org.apache.hadoop.hbase.wal.WALKeyImpl; 086import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 087import org.apache.hadoop.hbase.wal.WALStreamReader; 088import org.junit.After; 089import org.junit.AfterClass; 090import org.junit.Before; 091import org.junit.BeforeClass; 092import org.junit.ClassRule; 093import org.junit.Rule; 094import org.junit.Test; 095import org.junit.experimental.categories.Category; 096import org.junit.rules.TestName; 097import org.mockito.Mockito; 098import org.slf4j.Logger; 099import org.slf4j.LoggerFactory; 100 101import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 102import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 103 104import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 114 115/** 116 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 117 * region replicas 118 */ 119@SuppressWarnings("deprecation") 120@Category(LargeTests.class) 121public class TestHRegionReplayEvents { 122 123 @ClassRule 124 public static final HBaseClassTestRule CLASS_RULE = 125 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); 126 127 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 128 @Rule 129 public TestName name = new TestName(); 130 131 private static HBaseTestingUtil TEST_UTIL; 132 133 public static Configuration CONF; 134 private String dir; 135 136 private byte[][] families = 137 new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") }; 138 139 // Test names 140 protected byte[] tableName; 141 protected String method; 142 protected final byte[] row = Bytes.toBytes("rowA"); 143 protected final byte[] row2 = Bytes.toBytes("rowB"); 144 protected byte[] cq = Bytes.toBytes("cq"); 145 146 // per test fields 147 private Path rootDir; 148 private TableDescriptor htd; 149 private RegionServerServices rss; 150 private RegionInfo primaryHri, secondaryHri; 151 private HRegion primaryRegion, secondaryRegion; 152 private WAL walPrimary, walSecondary; 153 private WALStreamReader reader; 154 155 @BeforeClass 156 public static void setUpBeforeClass() throws Exception { 157 TEST_UTIL = new HBaseTestingUtil(); 158 TEST_UTIL.startMiniDFSCluster(1); 159 } 160 161 @AfterClass 162 public static void tearDownAfterClass() throws Exception { 163 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 164 TEST_UTIL.cleanupTestDir(); 165 TEST_UTIL.shutdownMiniDFSCluster(); 166 } 167 168 @Before 169 public void setUp() throws Exception { 170 CONF = TEST_UTIL.getConfiguration(); 171 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 172 method = name.getMethodName(); 173 tableName = Bytes.toBytes(name.getMethodName()); 174 rootDir = new Path(dir + method); 175 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 176 method = name.getMethodName(); 177 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 178 for (byte[] family : families) { 179 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 180 } 181 htd = builder.build(); 182 183 long time = EnvironmentEdgeManager.currentTime(); 184 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 185 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 186 primaryHri = 187 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 188 secondaryHri = 189 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 190 191 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); 192 walPrimary = wals.getWAL(primaryHri); 193 walSecondary = wals.getWAL(secondaryHri); 194 195 rss = mock(RegionServerServices.class); 196 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 197 when(rss.getConfiguration()).thenReturn(CONF); 198 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 199 when(rss.getRegionServerSpaceQuotaManager()).thenReturn(null); // or mock it properly 200 when(rss.getFlushRequester()).thenReturn(mock(FlushRequester.class)); 201 when(rss.getCompactionRequestor()).thenReturn(mock(CompactionRequester.class)); 202 when(rss.getMetrics()).thenReturn(mock(MetricsRegionServer.class)); 203 String string = 204 org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString(); 205 ExecutorService es = new ExecutorService(string); 206 es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1) 207 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); 208 when(rss.getExecutorService()).thenReturn(es); 209 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 210 primaryRegion.close(); 211 List<HRegion> regions = new ArrayList<>(); 212 regions.add(primaryRegion); 213 Mockito.doReturn(regions).when(rss).getRegions(); 214 215 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 216 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 217 218 reader = null; 219 } 220 221 @After 222 public void tearDown() throws Exception { 223 if (reader != null) { 224 reader.close(); 225 } 226 227 if (primaryRegion != null) { 228 HBaseTestingUtil.closeRegionAndWAL(primaryRegion); 229 } 230 if (secondaryRegion != null) { 231 HBaseTestingUtil.closeRegionAndWAL(secondaryRegion); 232 } 233 234 EnvironmentEdgeManagerTestHelper.reset(); 235 } 236 237 String getName() { 238 return name.getMethodName(); 239 } 240 241 // Some of the test cases are as follows: 242 // 1. replay flush start marker again 243 // 2. replay flush with smaller seqId than what is there in memstore snapshot 244 // 3. replay flush with larger seqId than what is there in memstore snapshot 245 // 4. replay flush commit without flush prepare. non droppable memstore 246 // 5. replay flush commit without flush prepare. droppable memstore 247 // 6. replay open region event 248 // 7. replay open region event after flush start 249 // 8. replay flush form an earlier seqId (test ignoring seqIds) 250 // 9. start flush does not prevent region from closing. 251 252 @Test 253 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 254 // load some data and flush ensure that the secondary replica will not execute the flush 255 256 // load some data to secondary by replaying 257 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 258 259 verifyData(secondaryRegion, 0, 1000, cq, families); 260 261 // flush region 262 FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true); 263 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 264 265 verifyData(secondaryRegion, 0, 1000, cq, families); 266 267 // close the region, and inspect that it has not flushed 268 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 269 // assert that there are no files (due to flush) 270 for (List<HStoreFile> f : files.values()) { 271 assertTrue(f.isEmpty()); 272 } 273 } 274 275 /** 276 * Tests a case where we replay only a flush start marker, then the region is closed. This region 277 * should not block indefinitely 278 */ 279 @Test 280 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 281 // load some data to primary and flush 282 int start = 0; 283 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 284 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 285 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 286 primaryRegion.flush(true); 287 288 // now replay the edits and the flush marker 289 reader = createWALReaderForPrimary(); 290 291 LOG.info("-- Replaying edits and flush events in secondary"); 292 while (true) { 293 WAL.Entry entry = reader.next(); 294 if (entry == null) { 295 break; 296 } 297 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 298 if (flushDesc != null) { 299 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 300 LOG.info("-- Replaying flush start in secondary"); 301 secondaryRegion.replayWALFlushStartMarker(flushDesc); 302 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 303 LOG.info("-- NOT Replaying flush commit in secondary"); 304 } 305 } else { 306 replayEdit(secondaryRegion, entry); 307 } 308 } 309 310 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 311 // now close the region which should not cause hold because of un-committed flush 312 secondaryRegion.close(); 313 314 // verify that the memstore size is back to what it was 315 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 316 } 317 318 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 319 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 320 return 0; // handled elsewhere 321 } 322 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 323 for (Cell cell : entry.getEdit().getCells()) 324 put.add(cell); 325 put.setDurability(Durability.SKIP_WAL); 326 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 327 region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId()); 328 return Integer.parseInt(Bytes.toString(put.getRow())); 329 } 330 331 private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException { 332 return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(), 333 AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration()); 334 } 335 336 @Test 337 public void testBatchReplayWithMultipleNonces() throws IOException { 338 try { 339 MutationReplay[] mutations = new MutationReplay[100]; 340 for (int i = 0; i < 100; i++) { 341 Put put = new Put(Bytes.toBytes(i)); 342 put.setDurability(Durability.SYNC_WAL); 343 for (byte[] familly : this.families) { 344 put.addColumn(familly, this.cq, null); 345 long nonceNum = i / 10; 346 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 347 } 348 } 349 primaryRegion.batchReplay(mutations, 20); 350 } catch (Exception e) { 351 String msg = "Error while replay of batch with multiple nonces. "; 352 LOG.error(msg, e); 353 fail(msg + e.getMessage()); 354 } 355 } 356 357 @Test 358 public void testReplayFlushesAndCompactions() throws IOException { 359 // initiate a secondary region with some data. 360 361 // load some data to primary and flush. 3 flushes and some more unflushed data 362 putDataWithFlushes(primaryRegion, 100, 300, 100); 363 364 // compaction from primary 365 LOG.info("-- Compacting primary, only 1 store"); 366 primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE); 367 368 // now replay the edits and the flush marker 369 reader = createWALReaderForPrimary(); 370 371 LOG.info("-- Replaying edits and flush events in secondary"); 372 int lastReplayed = 0; 373 int expectedStoreFileCount = 0; 374 while (true) { 375 WAL.Entry entry = reader.next(); 376 if (entry == null) { 377 break; 378 } 379 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 380 CompactionDescriptor compactionDesc = 381 WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 382 if (flushDesc != null) { 383 // first verify that everything is replayed and visible before flush event replay 384 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 385 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 386 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 387 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 388 MemStoreSize mss = store.getFlushableSize(); 389 long storeSize = store.getSize(); 390 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 391 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 392 LOG.info("-- Replaying flush start in secondary"); 393 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 394 assertNull(result.result); 395 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 396 397 // assert that the store memstore is smaller now 398 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 399 LOG.info("Memstore size reduced by:" 400 + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 401 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 402 403 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 404 LOG.info("-- Replaying flush commit in secondary"); 405 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 406 407 // assert that the flush files are picked 408 expectedStoreFileCount++; 409 for (HStore s : secondaryRegion.getStores()) { 410 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 411 } 412 MemStoreSize newMss = store.getFlushableSize(); 413 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 414 415 // assert that the region memstore is smaller now 416 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 417 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 418 419 // assert that the store sizes are bigger 420 assertTrue(store.getSize() > storeSize); 421 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 422 assertEquals(store.getSize(), store.getStorefilesSize()); 423 } 424 // after replay verify that everything is still visible 425 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 426 } else if (compactionDesc != null) { 427 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 428 429 // assert that the compaction is applied 430 for (HStore store : secondaryRegion.getStores()) { 431 StoreFileTracker sft = 432 StoreFileTrackerFactory.create(CONF, false, store.getStoreContext()); 433 if (store.getColumnFamilyName().equals("cf1")) { 434 assertEquals(1, store.getStorefilesCount()); 435 } else { 436 assertEquals(expectedStoreFileCount, sft.load().size()); 437 } 438 } 439 } else { 440 lastReplayed = replayEdit(secondaryRegion, entry); 441 } 442 } 443 444 assertEquals(400 - 1, lastReplayed); 445 LOG.info("-- Verifying edits from secondary"); 446 verifyData(secondaryRegion, 0, 400, cq, families); 447 448 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 449 verifyData(primaryRegion, 0, lastReplayed, cq, families); 450 for (HStore store : primaryRegion.getStores()) { 451 if (store.getColumnFamilyName().equals("cf1")) { 452 assertEquals(1, store.getStorefilesCount()); 453 } else { 454 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 455 } 456 } 457 } 458 459 /** 460 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 461 * equal to, greater or less than the previous flush start marker. 462 */ 463 @Test 464 public void testReplayFlushStartMarkers() throws IOException { 465 // load some data to primary and flush. 1 flush and some more unflushed data 466 putDataWithFlushes(primaryRegion, 100, 100, 100); 467 int numRows = 200; 468 469 // now replay the edits and the flush marker 470 reader = createWALReaderForPrimary(); 471 472 LOG.info("-- Replaying edits and flush events in secondary"); 473 474 FlushDescriptor startFlushDesc = null; 475 476 int lastReplayed = 0; 477 while (true) { 478 WAL.Entry entry = reader.next(); 479 if (entry == null) { 480 break; 481 } 482 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 483 if (flushDesc != null) { 484 // first verify that everything is replayed and visible before flush event replay 485 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 486 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 487 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 488 MemStoreSize mss = store.getFlushableSize(); 489 490 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 491 startFlushDesc = flushDesc; 492 LOG.info("-- Replaying flush start in secondary"); 493 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 494 assertNull(result.result); 495 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 496 assertTrue(regionMemstoreSize > 0); 497 assertTrue(mss.getHeapSize() > 0); 498 499 // assert that the store memstore is smaller now 500 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 501 LOG.info("Memstore size reduced by:" 502 + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 503 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 504 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 505 506 } 507 // after replay verify that everything is still visible 508 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 509 } else { 510 lastReplayed = replayEdit(secondaryRegion, entry); 511 } 512 } 513 514 // at this point, there should be some data (rows 0-100) in memstore snapshot 515 // and some more data in memstores (rows 100-200) 516 517 verifyData(secondaryRegion, 0, numRows, cq, families); 518 519 // Test case 1: replay the same flush start marker again 520 LOG.info("-- Replaying same flush start in secondary again"); 521 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 522 assertNull(result); // this should return null. Ignoring the flush start marker 523 // assert that we still have prepared flush with the previous setup. 524 assertNotNull(secondaryRegion.getPrepareFlushResult()); 525 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 526 startFlushDesc.getFlushSequenceNumber()); 527 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 528 verifyData(secondaryRegion, 0, numRows, cq, families); 529 530 // Test case 2: replay a flush start marker with a smaller seqId 531 FlushDescriptor startFlushDescSmallerSeqId = 532 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 533 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 534 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 535 assertNull(result); // this should return null. Ignoring the flush start marker 536 // assert that we still have prepared flush with the previous setup. 537 assertNotNull(secondaryRegion.getPrepareFlushResult()); 538 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 539 startFlushDesc.getFlushSequenceNumber()); 540 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 541 verifyData(secondaryRegion, 0, numRows, cq, families); 542 543 // Test case 3: replay a flush start marker with a larger seqId 544 FlushDescriptor startFlushDescLargerSeqId = 545 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 546 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 547 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 548 assertNull(result); // this should return null. Ignoring the flush start marker 549 // assert that we still have prepared flush with the previous setup. 550 assertNotNull(secondaryRegion.getPrepareFlushResult()); 551 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 552 startFlushDesc.getFlushSequenceNumber()); 553 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 554 verifyData(secondaryRegion, 0, numRows, cq, families); 555 556 LOG.info("-- Verifying edits from secondary"); 557 verifyData(secondaryRegion, 0, numRows, cq, families); 558 559 LOG.info("-- Verifying edits from primary."); 560 verifyData(primaryRegion, 0, numRows, cq, families); 561 } 562 563 /** 564 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 565 * less than the previous flush start marker. 566 */ 567 @Test 568 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 569 // load some data to primary and flush. 2 flushes and some more unflushed data 570 putDataWithFlushes(primaryRegion, 100, 200, 100); 571 int numRows = 300; 572 573 // now replay the edits and the flush marker 574 reader = createWALReaderForPrimary(); 575 576 LOG.info("-- Replaying edits and flush events in secondary"); 577 FlushDescriptor startFlushDesc = null; 578 FlushDescriptor commitFlushDesc = null; 579 580 int lastReplayed = 0; 581 while (true) { 582 System.out.println(lastReplayed); 583 WAL.Entry entry = reader.next(); 584 if (entry == null) { 585 break; 586 } 587 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 588 if (flushDesc != null) { 589 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 590 // don't replay the first flush start marker, hold on to it, replay the second one 591 if (startFlushDesc == null) { 592 startFlushDesc = flushDesc; 593 } else { 594 LOG.info("-- Replaying flush start in secondary"); 595 startFlushDesc = flushDesc; 596 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 597 assertNull(result.result); 598 } 599 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 600 // do not replay any flush commit yet 601 if (commitFlushDesc == null) { 602 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 603 } 604 } 605 // after replay verify that everything is still visible 606 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 607 } else { 608 lastReplayed = replayEdit(secondaryRegion, entry); 609 } 610 } 611 612 // at this point, there should be some data (rows 0-200) in memstore snapshot 613 // and some more data in memstores (rows 200-300) 614 verifyData(secondaryRegion, 0, numRows, cq, families); 615 616 // no store files in the region 617 int expectedStoreFileCount = 0; 618 for (HStore s : secondaryRegion.getStores()) { 619 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 620 } 621 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 622 623 // Test case 1: replay the a flush commit marker smaller than what we have prepared 624 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 625 + startFlushDesc); 626 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 627 628 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 629 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 630 631 // assert that the flush files are picked 632 expectedStoreFileCount++; 633 for (HStore s : secondaryRegion.getStores()) { 634 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 635 } 636 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 637 MemStoreSize mss = store.getFlushableSize(); 638 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 639 640 // assert that the region memstore is same as before 641 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 642 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 643 644 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 645 646 LOG.info("-- Verifying edits from secondary"); 647 verifyData(secondaryRegion, 0, numRows, cq, families); 648 649 LOG.info("-- Verifying edits from primary."); 650 verifyData(primaryRegion, 0, numRows, cq, families); 651 } 652 653 /** 654 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 655 * larger than the previous flush start marker. 656 */ 657 @Test 658 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 659 // load some data to primary and flush. 1 flush and some more unflushed data 660 putDataWithFlushes(primaryRegion, 100, 100, 100); 661 int numRows = 200; 662 663 // now replay the edits and the flush marker 664 reader = createWALReaderForPrimary(); 665 666 LOG.info("-- Replaying edits and flush events in secondary"); 667 FlushDescriptor startFlushDesc = null; 668 FlushDescriptor commitFlushDesc = null; 669 670 int lastReplayed = 0; 671 while (true) { 672 WAL.Entry entry = reader.next(); 673 if (entry == null) { 674 break; 675 } 676 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 677 if (flushDesc != null) { 678 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 679 if (startFlushDesc == null) { 680 LOG.info("-- Replaying flush start in secondary"); 681 startFlushDesc = flushDesc; 682 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 683 assertNull(result.result); 684 } 685 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 686 // do not replay any flush commit yet 687 // hold on to the flush commit marker but simulate a larger 688 // flush commit seqId 689 commitFlushDesc = FlushDescriptor.newBuilder(flushDesc) 690 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build(); 691 } 692 // after replay verify that everything is still visible 693 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 694 } else { 695 lastReplayed = replayEdit(secondaryRegion, entry); 696 } 697 } 698 699 // at this point, there should be some data (rows 0-100) in memstore snapshot 700 // and some more data in memstores (rows 100-200) 701 verifyData(secondaryRegion, 0, numRows, cq, families); 702 703 // no store files in the region 704 int expectedStoreFileCount = 0; 705 for (HStore s : secondaryRegion.getStores()) { 706 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 707 } 708 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 709 710 // Test case 1: replay the a flush commit marker larger than what we have prepared 711 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 712 + startFlushDesc); 713 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 714 715 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 716 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 717 718 // assert that the flush files are picked 719 expectedStoreFileCount++; 720 for (HStore s : secondaryRegion.getStores()) { 721 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 722 } 723 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 724 MemStoreSize mss = store.getFlushableSize(); 725 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 726 727 // assert that the region memstore is smaller than before, but not empty 728 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 729 assertTrue(newRegionMemstoreSize > 0); 730 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 731 732 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 733 734 LOG.info("-- Verifying edits from secondary"); 735 verifyData(secondaryRegion, 0, numRows, cq, families); 736 737 LOG.info("-- Verifying edits from primary."); 738 verifyData(primaryRegion, 0, numRows, cq, families); 739 } 740 741 /** 742 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 743 * memstore edits should be dropped after the flush commit replay since they should be in flushed 744 * files 745 */ 746 @Test 747 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 748 throws IOException { 749 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 750 } 751 752 /** 753 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 754 * memstore edits should be not dropped after the flush commit replay since not every edit will be 755 * in flushed files (based on seqId) 756 */ 757 @Test 758 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 759 throws IOException { 760 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 761 } 762 763 /** 764 * Tests the case where we receive a flush commit before receiving any flush prepare markers 765 */ 766 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 767 throws IOException { 768 // load some data to primary and flush. 1 flushes and some more unflushed data. 769 // write more data after flush depending on whether droppableSnapshot 770 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 771 int numRows = droppableMemstore ? 100 : 200; 772 773 // now replay the edits and the flush marker 774 reader = createWALReaderForPrimary(); 775 776 LOG.info("-- Replaying edits and flush events in secondary"); 777 FlushDescriptor commitFlushDesc = null; 778 779 int lastReplayed = 0; 780 while (true) { 781 WAL.Entry entry = reader.next(); 782 if (entry == null) { 783 break; 784 } 785 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 786 if (flushDesc != null) { 787 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 788 // do not replay flush start marker 789 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 790 commitFlushDesc = flushDesc; // hold on to the flush commit marker 791 } 792 // after replay verify that everything is still visible 793 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 794 } else { 795 lastReplayed = replayEdit(secondaryRegion, entry); 796 } 797 } 798 799 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 800 // and some more data in memstores (rows 100-300) 801 verifyData(secondaryRegion, 0, numRows, cq, families); 802 803 // no store files in the region 804 int expectedStoreFileCount = 0; 805 for (HStore s : secondaryRegion.getStores()) { 806 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 807 } 808 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 809 810 // Test case 1: replay a flush commit marker without start flush marker 811 assertNull(secondaryRegion.getPrepareFlushResult()); 812 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 813 814 // ensure all files are visible in secondary 815 for (HStore store : secondaryRegion.getStores()) { 816 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 817 } 818 819 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 820 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 821 822 // assert that the flush files are picked 823 expectedStoreFileCount++; 824 for (HStore s : secondaryRegion.getStores()) { 825 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 826 } 827 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 828 MemStoreSize mss = store.getFlushableSize(); 829 if (droppableMemstore) { 830 // assert that the memstore is dropped 831 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 832 } else { 833 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 834 } 835 836 // assert that the region memstore is same as before (we could not drop) 837 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 838 if (droppableMemstore) { 839 assertTrue(0 == newRegionMemstoreSize); 840 } else { 841 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 842 } 843 844 LOG.info("-- Verifying edits from secondary"); 845 verifyData(secondaryRegion, 0, numRows, cq, families); 846 847 LOG.info("-- Verifying edits from primary."); 848 verifyData(primaryRegion, 0, numRows, cq, families); 849 } 850 851 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 852 return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build(); 853 } 854 855 /** 856 * Tests replaying region open markers from primary region. Checks whether the files are picked up 857 */ 858 @Test 859 public void testReplayRegionOpenEvent() throws IOException { 860 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 861 int numRows = 100; 862 863 // close the region and open again. 864 primaryRegion.close(); 865 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 866 867 // now replay the edits and the flush marker 868 reader = createWALReaderForPrimary(); 869 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 870 871 LOG.info("-- Replaying edits and region events in secondary"); 872 while (true) { 873 WAL.Entry entry = reader.next(); 874 if (entry == null) { 875 break; 876 } 877 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 878 RegionEventDescriptor regionEventDesc = 879 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 880 881 if (flushDesc != null) { 882 // don't replay flush events 883 } else if (regionEventDesc != null) { 884 regionEvents.add(regionEventDesc); 885 } else { 886 // don't replay edits 887 } 888 } 889 890 // we should have 1 open, 1 close and 1 open event 891 assertEquals(3, regionEvents.size()); 892 893 // replay the first region open event. 894 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 895 896 // replay the close event as well 897 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 898 899 // no store files in the region 900 int expectedStoreFileCount = 0; 901 for (HStore s : secondaryRegion.getStores()) { 902 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 903 } 904 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 905 assertTrue(regionMemstoreSize == 0); 906 907 // now replay the region open event that should contain new file locations 908 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 909 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 910 911 // assert that the flush files are picked 912 expectedStoreFileCount++; 913 for (HStore s : secondaryRegion.getStores()) { 914 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 915 } 916 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 917 MemStoreSize mss = store.getFlushableSize(); 918 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 919 920 // assert that the region memstore is empty 921 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 922 assertTrue(newRegionMemstoreSize == 0); 923 924 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 925 // any 926 927 LOG.info("-- Verifying edits from secondary"); 928 verifyData(secondaryRegion, 0, numRows, cq, families); 929 930 LOG.info("-- Verifying edits from primary."); 931 verifyData(primaryRegion, 0, numRows, cq, families); 932 } 933 934 /** 935 * Tests the case where we replay a region open event after a flush start but before receiving 936 * flush commit 937 */ 938 @Test 939 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 940 putDataWithFlushes(primaryRegion, 100, 100, 100); 941 int numRows = 200; 942 943 // close the region and open again. 944 primaryRegion.close(); 945 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 946 947 // now replay the edits and the flush marker 948 reader = createWALReaderForPrimary(); 949 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 950 951 LOG.info("-- Replaying edits and region events in secondary"); 952 while (true) { 953 WAL.Entry entry = reader.next(); 954 if (entry == null) { 955 break; 956 } 957 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 958 RegionEventDescriptor regionEventDesc = 959 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 960 961 if (flushDesc != null) { 962 // only replay flush start 963 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 964 secondaryRegion.replayWALFlushStartMarker(flushDesc); 965 } 966 } else if (regionEventDesc != null) { 967 regionEvents.add(regionEventDesc); 968 } else { 969 replayEdit(secondaryRegion, entry); 970 } 971 } 972 973 // at this point, there should be some data (rows 0-100) in the memstore snapshot 974 // and some more data in memstores (rows 100-200) 975 verifyData(secondaryRegion, 0, numRows, cq, families); 976 977 // we should have 1 open, 1 close and 1 open event 978 assertEquals(3, regionEvents.size()); 979 980 // no store files in the region 981 int expectedStoreFileCount = 0; 982 for (HStore s : secondaryRegion.getStores()) { 983 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 984 } 985 986 // now replay the region open event that should contain new file locations 987 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 988 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 989 990 // assert that the flush files are picked 991 expectedStoreFileCount = 2; // two flushes happened 992 for (HStore s : secondaryRegion.getStores()) { 993 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 994 } 995 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 996 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 997 assertTrue(newSnapshotSize.getDataSize() == 0); 998 999 // assert that the region memstore is empty 1000 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 1001 assertTrue(newRegionMemstoreSize == 0); 1002 1003 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 1004 // any 1005 1006 LOG.info("-- Verifying edits from secondary"); 1007 verifyData(secondaryRegion, 0, numRows, cq, families); 1008 1009 LOG.info("-- Verifying edits from primary."); 1010 verifyData(primaryRegion, 0, numRows, cq, families); 1011 } 1012 1013 /** 1014 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1015 * of the last replayed region open event. 1016 */ 1017 @Test 1018 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1019 putDataWithFlushes(primaryRegion, 100, 100, 0); 1020 int numRows = 100; 1021 1022 // close the region and open again. 1023 primaryRegion.close(); 1024 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1025 1026 // now replay the edits and the flush marker 1027 reader = createWALReaderForPrimary(); 1028 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1029 List<WAL.Entry> edits = Lists.newArrayList(); 1030 1031 LOG.info("-- Replaying edits and region events in secondary"); 1032 while (true) { 1033 WAL.Entry entry = reader.next(); 1034 if (entry == null) { 1035 break; 1036 } 1037 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1038 RegionEventDescriptor regionEventDesc = 1039 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1040 1041 if (flushDesc != null) { 1042 // don't replay flushes 1043 } else if (regionEventDesc != null) { 1044 regionEvents.add(regionEventDesc); 1045 } else { 1046 edits.add(entry); 1047 } 1048 } 1049 1050 // replay the region open of first open, but with the seqid of the second open 1051 // this way non of the flush files will be picked up. 1052 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0)) 1053 .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build()); 1054 1055 // replay edits from the before region close. If replay does not 1056 // skip these the following verification will NOT fail. 1057 for (WAL.Entry entry : edits) { 1058 replayEdit(secondaryRegion, entry); 1059 } 1060 1061 boolean expectedFail = false; 1062 try { 1063 verifyData(secondaryRegion, 0, numRows, cq, families); 1064 } catch (AssertionError e) { 1065 expectedFail = true; // expected 1066 } 1067 if (!expectedFail) { 1068 fail("Should have failed this verification"); 1069 } 1070 } 1071 1072 @Test 1073 public void testReplayFlushSeqIds() throws IOException { 1074 // load some data to primary and flush 1075 int start = 0; 1076 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 1077 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1078 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1079 primaryRegion.flush(true); 1080 1081 // now replay the flush marker 1082 reader = createWALReaderForPrimary(); 1083 1084 long flushSeqId = -1; 1085 LOG.info("-- Replaying flush events in secondary"); 1086 while (true) { 1087 WAL.Entry entry = reader.next(); 1088 if (entry == null) { 1089 break; 1090 } 1091 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1092 if (flushDesc != null) { 1093 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1094 LOG.info("-- Replaying flush start in secondary"); 1095 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1096 flushSeqId = flushDesc.getFlushSequenceNumber(); 1097 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1098 LOG.info("-- Replaying flush commit in secondary"); 1099 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1100 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1101 } 1102 } 1103 // else do not replay 1104 } 1105 1106 // TODO: what to do with this? 1107 // assert that the newly picked up flush file is visible 1108 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1109 assertEquals(flushSeqId, readPoint); 1110 1111 // after replay verify that everything is still visible 1112 verifyData(secondaryRegion, 0, 100, cq, families); 1113 } 1114 1115 @Test 1116 public void testSeqIdsFromReplay() throws IOException { 1117 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1118 // original seqIds and they are made visible through mvcc read point upon replay 1119 String method = name.getMethodName(); 1120 byte[] tableName = Bytes.toBytes(method); 1121 byte[] family = Bytes.toBytes("family"); 1122 1123 HRegion region = initHRegion(tableName, family); 1124 try { 1125 // replay an entry that is bigger than current read point 1126 long readPoint = region.getMVCC().getReadPoint(); 1127 long origSeqId = readPoint + 100; 1128 1129 Put put = new Put(row).addColumn(family, row, row); 1130 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1131 replay(region, put, origSeqId); 1132 1133 // read point should have advanced to this seqId 1134 assertGet(region, family, row); 1135 1136 // region seqId should have advanced at least to this seqId 1137 assertEquals(origSeqId, region.getReadPoint(null)); 1138 1139 // replay an entry that is smaller than current read point 1140 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1141 // replay does not allow reads while replay is going on. 1142 put = new Put(row2).addColumn(family, row2, row2); 1143 put.setDurability(Durability.SKIP_WAL); 1144 replay(region, put, origSeqId - 50); 1145 1146 assertGet(region, family, row2); 1147 } finally { 1148 region.close(); 1149 } 1150 } 1151 1152 /** 1153 * Tests that a region opened in secondary mode would not write region open / close events to its 1154 * WAL. 1155 */ 1156 @Test 1157 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1158 secondaryRegion.close(); 1159 walSecondary = spy(walSecondary); 1160 1161 // test for region open and close 1162 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1163 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1164 any(WALEdit.class)); 1165 1166 // test for replay prepare flush 1167 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1168 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder() 1169 .setFlushSequenceNumber(10) 1170 .setTableName(UnsafeByteOperations 1171 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1172 .setAction(FlushAction.START_FLUSH) 1173 .setEncodedRegionName( 1174 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1175 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1176 .build()); 1177 1178 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1179 any(WALEdit.class)); 1180 1181 secondaryRegion.close(); 1182 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1183 any(WALEdit.class)); 1184 } 1185 1186 /** 1187 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1188 */ 1189 @Test 1190 public void testRegionReadsEnabledFlag() throws IOException { 1191 1192 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1193 1194 verifyData(secondaryRegion, 0, 100, cq, families); 1195 1196 // now disable reads 1197 secondaryRegion.setReadsEnabled(false); 1198 try { 1199 verifyData(secondaryRegion, 0, 100, cq, families); 1200 fail("Should have failed with IOException"); 1201 } catch (IOException ex) { 1202 // expected 1203 } 1204 1205 // verify that we can still replay data 1206 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1207 1208 // now enable reads again 1209 secondaryRegion.setReadsEnabled(true); 1210 verifyData(secondaryRegion, 0, 200, cq, families); 1211 } 1212 1213 /** 1214 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1215 * It should write the flush request marker instead. 1216 */ 1217 @Test 1218 public void testWriteFlushRequestMarker() throws IOException { 1219 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1220 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1221 assertNotNull(result); 1222 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1223 assertFalse(result.wroteFlushWalMarker); 1224 1225 // request flush again, but this time with writeFlushRequestWalMarker = true 1226 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1227 assertNotNull(result); 1228 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1229 assertTrue(result.wroteFlushWalMarker); 1230 1231 List<FlushDescriptor> flushes = Lists.newArrayList(); 1232 reader = createWALReaderForPrimary(); 1233 while (true) { 1234 WAL.Entry entry = reader.next(); 1235 if (entry == null) { 1236 break; 1237 } 1238 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1239 if (flush != null) { 1240 flushes.add(flush); 1241 } 1242 } 1243 1244 assertEquals(1, flushes.size()); 1245 assertNotNull(flushes.get(0)); 1246 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1247 } 1248 1249 /** 1250 * Test the case where the secondary region replica is not in reads enabled state because it is 1251 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush 1252 * marker entry should restore the reads enabled status in the region and allow the reads to 1253 * continue. 1254 */ 1255 @Test 1256 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1257 disableReads(secondaryRegion); 1258 1259 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1260 // triggered flush restores readsEnabled 1261 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1262 reader = createWALReaderForPrimary(); 1263 while (true) { 1264 WAL.Entry entry = reader.next(); 1265 if (entry == null) { 1266 break; 1267 } 1268 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1269 if (flush != null) { 1270 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1271 } 1272 } 1273 1274 // now reads should be enabled 1275 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1276 } 1277 1278 /** 1279 * Test the case where the secondary region replica is not in reads enabled state because it is 1280 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1281 * entries should restore the reads enabled status in the region and allow the reads to continue. 1282 */ 1283 @Test 1284 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1285 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1286 // from triggered flush restores readsEnabled 1287 disableReads(secondaryRegion); 1288 1289 // put some data in primary 1290 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1291 primaryRegion.flush(true); 1292 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1293 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1294 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1295 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1296 // but can't figure it... and this is only test that seems to suffer this flush issue. 1297 // St.Ack 20160201 1298 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1299 1300 reader = createWALReaderForPrimary(); 1301 while (true) { 1302 WAL.Entry entry = reader.next(); 1303 LOG.info(Objects.toString(entry)); 1304 if (entry == null) { 1305 break; 1306 } 1307 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1308 if (flush != null) { 1309 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1310 } else { 1311 replayEdit(secondaryRegion, entry); 1312 } 1313 } 1314 1315 // now reads should be enabled 1316 verifyData(secondaryRegion, 0, 100, cq, families); 1317 } 1318 1319 /** 1320 * Test the case where the secondary region replica is not in reads enabled state because it is 1321 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1322 * entries should restore the reads enabled status in the region and allow the reads to continue. 1323 */ 1324 @Test 1325 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1326 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1327 // from triggered flush restores readsEnabled 1328 disableReads(secondaryRegion); 1329 1330 // put some data in primary 1331 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1332 primaryRegion.flush(true); 1333 1334 reader = createWALReaderForPrimary(); 1335 while (true) { 1336 WAL.Entry entry = reader.next(); 1337 if (entry == null) { 1338 break; 1339 } 1340 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1341 if (flush != null) { 1342 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1343 } 1344 } 1345 1346 // now reads should be enabled 1347 verifyData(secondaryRegion, 0, 100, cq, families); 1348 } 1349 1350 /** 1351 * Test the case where the secondary region replica is not in reads enabled state because it is 1352 * waiting for a flush or region open marker from primary region. Replaying region open event 1353 * entry from primary should restore the reads enabled status in the region and allow the reads to 1354 * continue. 1355 */ 1356 @Test 1357 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1358 // Test case 3: Test that replaying region open event markers restores readsEnabled 1359 disableReads(secondaryRegion); 1360 1361 primaryRegion.close(); 1362 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1363 1364 reader = createWALReaderForPrimary(); 1365 while (true) { 1366 WAL.Entry entry = reader.next(); 1367 if (entry == null) { 1368 break; 1369 } 1370 1371 RegionEventDescriptor regionEventDesc = 1372 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1373 1374 if (regionEventDesc != null) { 1375 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1376 } 1377 } 1378 1379 // now reads should be enabled 1380 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1381 } 1382 1383 @Test 1384 public void testRefresStoreFiles() throws IOException { 1385 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1386 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1387 1388 // Test case 1: refresh with an empty region 1389 secondaryRegion.refreshStoreFiles(); 1390 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1391 1392 // do one flush 1393 putDataWithFlushes(primaryRegion, 100, 100, 0); 1394 int numRows = 100; 1395 1396 // refresh the store file list, and ensure that the files are picked up. 1397 secondaryRegion.refreshStoreFiles(); 1398 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1399 secondaryRegion.getStoreFileList(families)); 1400 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1401 1402 LOG.info("-- Verifying edits from secondary"); 1403 verifyData(secondaryRegion, 0, numRows, cq, families); 1404 1405 // Test case 2: 3 some more flushes 1406 putDataWithFlushes(primaryRegion, 100, 300, 0); 1407 numRows = 300; 1408 1409 // refresh the store file list, and ensure that the files are picked up. 1410 secondaryRegion.refreshStoreFiles(); 1411 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1412 secondaryRegion.getStoreFileList(families)); 1413 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1414 1415 LOG.info("-- Verifying edits from secondary"); 1416 verifyData(secondaryRegion, 0, numRows, cq, families); 1417 1418 if (FSUtils.WINDOWS) { 1419 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1420 return; 1421 } 1422 1423 // Test case 3: compact primary files 1424 primaryRegion.compactStores(); 1425 List<HRegion> regions = new ArrayList<>(); 1426 regions.add(primaryRegion); 1427 Mockito.doReturn(regions).when(rss).getRegions(); 1428 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1429 cleaner.chore(); 1430 secondaryRegion.refreshStoreFiles(); 1431 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1432 secondaryRegion.getStoreFileList(families)); 1433 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1434 1435 LOG.info("-- Verifying edits from secondary"); 1436 verifyData(secondaryRegion, 0, numRows, cq, families); 1437 1438 LOG.info("-- Replaying edits in secondary"); 1439 1440 // Test case 4: replay some edits, ensure that memstore is dropped. 1441 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1442 putDataWithFlushes(primaryRegion, 400, 400, 0); 1443 numRows = 400; 1444 1445 reader = createWALReaderForPrimary(); 1446 while (true) { 1447 WAL.Entry entry = reader.next(); 1448 if (entry == null) { 1449 break; 1450 } 1451 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1452 if (flush != null) { 1453 // do not replay flush 1454 } else { 1455 replayEdit(secondaryRegion, entry); 1456 } 1457 } 1458 1459 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1460 1461 secondaryRegion.refreshStoreFiles(); 1462 1463 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1464 1465 LOG.info("-- Verifying edits from primary"); 1466 verifyData(primaryRegion, 0, numRows, cq, families); 1467 LOG.info("-- Verifying edits from secondary"); 1468 verifyData(secondaryRegion, 0, numRows, cq, families); 1469 } 1470 1471 /** 1472 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1473 */ 1474 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1475 List<Path> l1 = new ArrayList<>(list1.size()); 1476 for (String path : list1) { 1477 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1478 } 1479 List<Path> l2 = new ArrayList<>(list2.size()); 1480 for (String path : list2) { 1481 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1482 } 1483 assertEquals(l1, l2); 1484 } 1485 1486 private void disableReads(HRegion region) { 1487 region.setReadsEnabled(false); 1488 try { 1489 verifyData(region, 0, 1, cq, families); 1490 fail("Should have failed with IOException"); 1491 } catch (IOException ex) { 1492 // expected 1493 } 1494 } 1495 1496 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1497 put.setDurability(Durability.SKIP_WAL); 1498 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1499 region.batchReplay(new MutationReplay[] { mutation }, replaySeqId); 1500 } 1501 1502 /** 1503 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1504 */ 1505 @Test 1506 public void testReplayBulkLoadEvent() throws IOException { 1507 LOG.info("testReplayBulkLoadEvent starts"); 1508 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1509 1510 // close the region and open again. 1511 primaryRegion.close(); 1512 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1513 1514 // bulk load a file into primary region 1515 byte[] randomValues = new byte[20]; 1516 Bytes.random(randomValues); 1517 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1518 1519 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1520 int expectedLoadFileCount = 0; 1521 for (byte[] family : families) { 1522 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1523 expectedLoadFileCount++; 1524 } 1525 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1526 1527 // now replay the edits and the bulk load marker 1528 reader = createWALReaderForPrimary(); 1529 1530 LOG.info("-- Replaying edits and region events in secondary"); 1531 BulkLoadDescriptor bulkloadEvent = null; 1532 while (true) { 1533 WAL.Entry entry = reader.next(); 1534 if (entry == null) { 1535 break; 1536 } 1537 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1538 if (bulkloadEvent != null) { 1539 break; 1540 } 1541 } 1542 1543 // we should have 1 bulk load event 1544 assertTrue(bulkloadEvent != null); 1545 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1546 1547 // replay the bulk load event 1548 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1549 1550 List<String> storeFileNames = new ArrayList<>(); 1551 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1552 storeFileNames.addAll(storeDesc.getStoreFileList()); 1553 } 1554 // assert that the bulk loaded files are picked 1555 for (HStore s : secondaryRegion.getStores()) { 1556 for (HStoreFile sf : s.getStorefiles()) { 1557 storeFileNames.remove(sf.getPath().getName()); 1558 } 1559 } 1560 assertTrue("Found some store file isn't loaded:" + storeFileNames, storeFileNames.isEmpty()); 1561 1562 LOG.info("-- Verifying edits from secondary"); 1563 for (byte[] family : families) { 1564 assertGet(secondaryRegion, family, randomValues); 1565 } 1566 } 1567 1568 @Test 1569 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1570 // tests replaying flush commit marker, but the flush file has already been compacted 1571 // from primary and also deleted from the archive directory 1572 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder() 1573 .setFlushSequenceNumber(Long.MAX_VALUE) 1574 .setTableName(UnsafeByteOperations 1575 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1576 .setAction(FlushAction.COMMIT_FLUSH) 1577 .setEncodedRegionName( 1578 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1579 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1580 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1581 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1582 .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build()) 1583 .build()); 1584 } 1585 1586 @Test 1587 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1588 // tests replaying compaction marker, but the compaction output file has already been compacted 1589 // from primary and also deleted from the archive directory 1590 secondaryRegion 1591 .replayWALCompactionMarker( 1592 CompactionDescriptor.newBuilder() 1593 .setTableName(UnsafeByteOperations 1594 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1595 .setEncodedRegionName( 1596 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1597 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123") 1598 .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir") 1599 .setRegionName( 1600 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1601 .build(), 1602 true, true, Long.MAX_VALUE); 1603 } 1604 1605 @Test 1606 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1607 // tests replaying region open event marker, but the region files have already been compacted 1608 // from primary and also deleted from the archive directory 1609 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1610 .setTableName(UnsafeByteOperations 1611 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1612 .setEncodedRegionName( 1613 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1614 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1615 .setEventType(EventType.REGION_OPEN) 1616 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1617 .setLogSequenceNumber(Long.MAX_VALUE) 1618 .addStores( 1619 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1620 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1621 .build()); 1622 } 1623 1624 @Test 1625 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1626 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1627 // from primary and also deleted from the archive directory 1628 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1629 .setTableName( 1630 ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1631 .setEncodedRegionName( 1632 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1633 .setBulkloadSeqNum(Long.MAX_VALUE) 1634 .addStores( 1635 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1636 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1637 .build()); 1638 } 1639 1640 private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes) 1641 throws IOException { 1642 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1643 // TODO We need a way to do this without creating files 1644 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); 1645 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1646 try { 1647 hFileFactory.withOutputStream(out); 1648 hFileFactory.withFileContext(new HFileContextBuilder().build()); 1649 HFile.Writer writer = hFileFactory.create(); 1650 try { 1651 writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) 1652 .setRow(valueBytes).setFamily(family).setQualifier(valueBytes).setTimestamp(0L) 1653 .setType(KeyValue.Type.Put.getCode()).setValue(valueBytes).build())); 1654 } finally { 1655 writer.close(); 1656 } 1657 } finally { 1658 out.close(); 1659 } 1660 return testFile.toString(); 1661 } 1662 1663 /** 1664 * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush 1665 * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but 1666 * does not execute flush after 1667 */ 1668 private void putDataWithFlushes(HRegion region, int flushInterval, int numRows, 1669 int numRowsAfterFlush) throws IOException { 1670 int start = 0; 1671 for (; start < numRows; start += flushInterval) { 1672 LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval)); 1673 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1674 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1675 region.flush(true); 1676 } 1677 LOG.info("-- Writing some more data to primary, not flushing"); 1678 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1679 } 1680 1681 private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf, 1682 byte[]... families) throws IOException { 1683 for (int i = startRow; i < startRow + numRows; i++) { 1684 Put put = new Put(Bytes.toBytes("" + i)); 1685 put.setDurability(Durability.SKIP_WAL); 1686 for (byte[] family : families) { 1687 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1688 } 1689 replay(region, put, i + 1); 1690 } 1691 } 1692 1693 private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException { 1694 return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, 1695 HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families); 1696 } 1697}