001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertNotNull; 026import static org.junit.jupiter.api.Assertions.assertNull; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028import static org.junit.jupiter.api.Assertions.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.FileNotFoundException; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.Map; 041import java.util.Objects; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellBuilderType; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; 049import org.apache.hadoop.hbase.HBaseTestingUtil; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Get; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.RegionInfo; 059import org.apache.hadoop.hbase.client.RegionInfoBuilder; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.executor.ExecutorService; 063import org.apache.hadoop.hbase.executor.ExecutorType; 064import org.apache.hadoop.hbase.io.hfile.HFile; 065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 066import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 067import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 068import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 069import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 070import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 072import org.apache.hadoop.hbase.testclassification.LargeTests; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 076import org.apache.hadoop.hbase.util.FSUtils; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.hbase.util.Strings; 079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 080import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader; 081import org.apache.hadoop.hbase.wal.WAL; 082import org.apache.hadoop.hbase.wal.WALEdit; 083import org.apache.hadoop.hbase.wal.WALFactory; 084import org.apache.hadoop.hbase.wal.WALKeyImpl; 085import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 086import org.apache.hadoop.hbase.wal.WALStreamReader; 087import org.junit.jupiter.api.AfterAll; 088import org.junit.jupiter.api.AfterEach; 089import org.junit.jupiter.api.BeforeAll; 090import org.junit.jupiter.api.BeforeEach; 091import org.junit.jupiter.api.Tag; 092import org.junit.jupiter.api.Test; 093import org.junit.jupiter.api.TestInfo; 094import org.mockito.Mockito; 095import org.slf4j.Logger; 096import org.slf4j.LoggerFactory; 097 098import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 099import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 100 101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 111 112/** 113 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 114 * region replicas 115 */ 116@SuppressWarnings("deprecation") 117@Tag(LargeTests.TAG) 118public class TestHRegionReplayEvents { 119 120 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionReplayEvents.class); 121 private String name; 122 123 private static HBaseTestingUtil TEST_UTIL; 124 125 public static Configuration CONF; 126 private String dir; 127 128 private byte[][] families = 129 new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") }; 130 131 // Test names 132 protected byte[] tableName; 133 protected String method; 134 protected final byte[] row = Bytes.toBytes("rowA"); 135 protected final byte[] row2 = Bytes.toBytes("rowB"); 136 protected byte[] cq = Bytes.toBytes("cq"); 137 138 // per test fields 139 private Path rootDir; 140 private TableDescriptor htd; 141 private RegionServerServices rss; 142 private RegionInfo primaryHri, secondaryHri; 143 private HRegion primaryRegion, secondaryRegion; 144 private WAL walPrimary, walSecondary; 145 private WALStreamReader reader; 146 147 @BeforeAll 148 public static void setUpBeforeClass() throws Exception { 149 TEST_UTIL = new HBaseTestingUtil(); 150 TEST_UTIL.startMiniDFSCluster(1); 151 } 152 153 @AfterAll 154 public static void tearDownAfterClass() throws Exception { 155 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 156 TEST_UTIL.cleanupTestDir(); 157 TEST_UTIL.shutdownMiniDFSCluster(); 158 } 159 160 @BeforeEach 161 public void setUp(TestInfo testInfo) throws Exception { 162 this.name = testInfo.getTestMethod().get().getName(); 163 CONF = TEST_UTIL.getConfiguration(); 164 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 165 method = name; 166 tableName = Bytes.toBytes(name); 167 rootDir = new Path(dir + method); 168 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 169 method = name; 170 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 171 for (byte[] family : families) { 172 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 173 } 174 htd = builder.build(); 175 176 long time = EnvironmentEdgeManager.currentTime(); 177 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 178 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 179 primaryHri = 180 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 181 secondaryHri = 182 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 183 184 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); 185 walPrimary = wals.getWAL(primaryHri); 186 walSecondary = wals.getWAL(secondaryHri); 187 188 rss = mock(RegionServerServices.class); 189 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 190 when(rss.getConfiguration()).thenReturn(CONF); 191 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 192 when(rss.getRegionServerSpaceQuotaManager()).thenReturn(null); // or mock it properly 193 when(rss.getFlushRequester()).thenReturn(mock(FlushRequester.class)); 194 when(rss.getCompactionRequestor()).thenReturn(mock(CompactionRequester.class)); 195 when(rss.getMetrics()).thenReturn(mock(MetricsRegionServer.class)); 196 String string = 197 org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString(); 198 ExecutorService es = new ExecutorService(string); 199 es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1) 200 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); 201 when(rss.getExecutorService()).thenReturn(es); 202 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 203 primaryRegion.close(); 204 List<HRegion> regions = new ArrayList<>(); 205 regions.add(primaryRegion); 206 Mockito.doReturn(regions).when(rss).getRegions(); 207 208 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 209 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 210 211 reader = null; 212 } 213 214 @AfterEach 215 public void tearDown() throws Exception { 216 if (reader != null) { 217 reader.close(); 218 } 219 220 if (primaryRegion != null) { 221 HBaseTestingUtil.closeRegionAndWAL(primaryRegion); 222 } 223 if (secondaryRegion != null) { 224 HBaseTestingUtil.closeRegionAndWAL(secondaryRegion); 225 } 226 227 EnvironmentEdgeManagerTestHelper.reset(); 228 } 229 230 String getName() { 231 return name; 232 } 233 234 // Some of the test cases are as follows: 235 // 1. replay flush start marker again 236 // 2. replay flush with smaller seqId than what is there in memstore snapshot 237 // 3. replay flush with larger seqId than what is there in memstore snapshot 238 // 4. replay flush commit without flush prepare. non droppable memstore 239 // 5. replay flush commit without flush prepare. droppable memstore 240 // 6. replay open region event 241 // 7. replay open region event after flush start 242 // 8. replay flush form an earlier seqId (test ignoring seqIds) 243 // 9. start flush does not prevent region from closing. 244 245 @Test 246 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 247 // load some data and flush ensure that the secondary replica will not execute the flush 248 249 // load some data to secondary by replaying 250 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 251 252 verifyData(secondaryRegion, 0, 1000, cq, families); 253 254 // flush region 255 FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true); 256 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 257 258 verifyData(secondaryRegion, 0, 1000, cq, families); 259 260 // close the region, and inspect that it has not flushed 261 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 262 // assert that there are no files (due to flush) 263 for (List<HStoreFile> f : files.values()) { 264 assertTrue(f.isEmpty()); 265 } 266 } 267 268 /** 269 * Tests a case where we replay only a flush start marker, then the region is closed. This region 270 * should not block indefinitely 271 */ 272 @Test 273 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 274 // load some data to primary and flush 275 int start = 0; 276 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 277 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 278 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 279 primaryRegion.flush(true); 280 281 // now replay the edits and the flush marker 282 reader = createWALReaderForPrimary(); 283 284 LOG.info("-- Replaying edits and flush events in secondary"); 285 while (true) { 286 WAL.Entry entry = reader.next(); 287 if (entry == null) { 288 break; 289 } 290 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 291 if (flushDesc != null) { 292 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 293 LOG.info("-- Replaying flush start in secondary"); 294 secondaryRegion.replayWALFlushStartMarker(flushDesc); 295 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 296 LOG.info("-- NOT Replaying flush commit in secondary"); 297 } 298 } else { 299 replayEdit(secondaryRegion, entry); 300 } 301 } 302 303 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 304 // now close the region which should not cause hold because of un-committed flush 305 secondaryRegion.close(); 306 307 // verify that the memstore size is back to what it was 308 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 309 } 310 311 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 312 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 313 return 0; // handled elsewhere 314 } 315 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 316 for (Cell cell : entry.getEdit().getCells()) 317 put.add(cell); 318 put.setDurability(Durability.SKIP_WAL); 319 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 320 region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId()); 321 return Integer.parseInt(Bytes.toString(put.getRow())); 322 } 323 324 private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException { 325 return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(), 326 AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration()); 327 } 328 329 @Test 330 public void testBatchReplayWithMultipleNonces() throws IOException { 331 try { 332 MutationReplay[] mutations = new MutationReplay[100]; 333 for (int i = 0; i < 100; i++) { 334 Put put = new Put(Bytes.toBytes(i)); 335 put.setDurability(Durability.SYNC_WAL); 336 for (byte[] familly : this.families) { 337 put.addColumn(familly, this.cq, null); 338 long nonceNum = i / 10; 339 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 340 } 341 } 342 primaryRegion.batchReplay(mutations, 20); 343 } catch (Exception e) { 344 String msg = "Error while replay of batch with multiple nonces. "; 345 LOG.error(msg, e); 346 fail(msg + e.getMessage()); 347 } 348 } 349 350 @Test 351 public void testReplayFlushesAndCompactions() throws IOException { 352 // initiate a secondary region with some data. 353 354 // load some data to primary and flush. 3 flushes and some more unflushed data 355 putDataWithFlushes(primaryRegion, 100, 300, 100); 356 357 // compaction from primary 358 LOG.info("-- Compacting primary, only 1 store"); 359 primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE); 360 361 // now replay the edits and the flush marker 362 reader = createWALReaderForPrimary(); 363 364 LOG.info("-- Replaying edits and flush events in secondary"); 365 int lastReplayed = 0; 366 int expectedStoreFileCount = 0; 367 while (true) { 368 WAL.Entry entry = reader.next(); 369 if (entry == null) { 370 break; 371 } 372 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 373 CompactionDescriptor compactionDesc = 374 WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 375 if (flushDesc != null) { 376 // first verify that everything is replayed and visible before flush event replay 377 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 378 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 379 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 380 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 381 MemStoreSize mss = store.getFlushableSize(); 382 long storeSize = store.getSize(); 383 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 384 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 385 LOG.info("-- Replaying flush start in secondary"); 386 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 387 assertNull(result.result); 388 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 389 390 // assert that the store memstore is smaller now 391 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 392 LOG.info("Memstore size reduced by:" 393 + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 394 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 395 396 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 397 LOG.info("-- Replaying flush commit in secondary"); 398 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 399 400 // assert that the flush files are picked 401 expectedStoreFileCount++; 402 for (HStore s : secondaryRegion.getStores()) { 403 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 404 } 405 MemStoreSize newMss = store.getFlushableSize(); 406 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 407 408 // assert that the region memstore is smaller now 409 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 410 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 411 412 // assert that the store sizes are bigger 413 assertTrue(store.getSize() > storeSize); 414 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 415 assertEquals(store.getSize(), store.getStorefilesSize()); 416 } 417 // after replay verify that everything is still visible 418 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 419 } else if (compactionDesc != null) { 420 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 421 422 // assert that the compaction is applied 423 for (HStore store : secondaryRegion.getStores()) { 424 StoreFileTracker sft = 425 StoreFileTrackerFactory.create(CONF, false, store.getStoreContext()); 426 if (store.getColumnFamilyName().equals("cf1")) { 427 assertEquals(1, store.getStorefilesCount()); 428 } else { 429 assertEquals(expectedStoreFileCount, sft.load().size()); 430 } 431 } 432 } else { 433 lastReplayed = replayEdit(secondaryRegion, entry); 434 } 435 } 436 437 assertEquals(400 - 1, lastReplayed); 438 LOG.info("-- Verifying edits from secondary"); 439 verifyData(secondaryRegion, 0, 400, cq, families); 440 441 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 442 verifyData(primaryRegion, 0, lastReplayed, cq, families); 443 for (HStore store : primaryRegion.getStores()) { 444 if (store.getColumnFamilyName().equals("cf1")) { 445 assertEquals(1, store.getStorefilesCount()); 446 } else { 447 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 448 } 449 } 450 } 451 452 /** 453 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 454 * equal to, greater or less than the previous flush start marker. 455 */ 456 @Test 457 public void testReplayFlushStartMarkers() throws IOException { 458 // load some data to primary and flush. 1 flush and some more unflushed data 459 putDataWithFlushes(primaryRegion, 100, 100, 100); 460 int numRows = 200; 461 462 // now replay the edits and the flush marker 463 reader = createWALReaderForPrimary(); 464 465 LOG.info("-- Replaying edits and flush events in secondary"); 466 467 FlushDescriptor startFlushDesc = null; 468 469 int lastReplayed = 0; 470 while (true) { 471 WAL.Entry entry = reader.next(); 472 if (entry == null) { 473 break; 474 } 475 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 476 if (flushDesc != null) { 477 // first verify that everything is replayed and visible before flush event replay 478 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 479 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 480 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 481 MemStoreSize mss = store.getFlushableSize(); 482 483 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 484 startFlushDesc = flushDesc; 485 LOG.info("-- Replaying flush start in secondary"); 486 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 487 assertNull(result.result); 488 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 489 assertTrue(regionMemstoreSize > 0); 490 assertTrue(mss.getHeapSize() > 0); 491 492 // assert that the store memstore is smaller now 493 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 494 LOG.info("Memstore size reduced by:" 495 + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 496 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 497 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 498 499 } 500 // after replay verify that everything is still visible 501 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 502 } else { 503 lastReplayed = replayEdit(secondaryRegion, entry); 504 } 505 } 506 507 // at this point, there should be some data (rows 0-100) in memstore snapshot 508 // and some more data in memstores (rows 100-200) 509 510 verifyData(secondaryRegion, 0, numRows, cq, families); 511 512 // Test case 1: replay the same flush start marker again 513 LOG.info("-- Replaying same flush start in secondary again"); 514 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 515 assertNull(result); // this should return null. Ignoring the flush start marker 516 // assert that we still have prepared flush with the previous setup. 517 assertNotNull(secondaryRegion.getPrepareFlushResult()); 518 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 519 startFlushDesc.getFlushSequenceNumber()); 520 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 521 verifyData(secondaryRegion, 0, numRows, cq, families); 522 523 // Test case 2: replay a flush start marker with a smaller seqId 524 FlushDescriptor startFlushDescSmallerSeqId = 525 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 526 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 527 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 528 assertNull(result); // this should return null. Ignoring the flush start marker 529 // assert that we still have prepared flush with the previous setup. 530 assertNotNull(secondaryRegion.getPrepareFlushResult()); 531 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 532 startFlushDesc.getFlushSequenceNumber()); 533 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 534 verifyData(secondaryRegion, 0, numRows, cq, families); 535 536 // Test case 3: replay a flush start marker with a larger seqId 537 FlushDescriptor startFlushDescLargerSeqId = 538 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 539 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 540 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 541 assertNull(result); // this should return null. Ignoring the flush start marker 542 // assert that we still have prepared flush with the previous setup. 543 assertNotNull(secondaryRegion.getPrepareFlushResult()); 544 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 545 startFlushDesc.getFlushSequenceNumber()); 546 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 547 verifyData(secondaryRegion, 0, numRows, cq, families); 548 549 LOG.info("-- Verifying edits from secondary"); 550 verifyData(secondaryRegion, 0, numRows, cq, families); 551 552 LOG.info("-- Verifying edits from primary."); 553 verifyData(primaryRegion, 0, numRows, cq, families); 554 } 555 556 /** 557 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 558 * less than the previous flush start marker. 559 */ 560 @Test 561 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 562 // load some data to primary and flush. 2 flushes and some more unflushed data 563 putDataWithFlushes(primaryRegion, 100, 200, 100); 564 int numRows = 300; 565 566 // now replay the edits and the flush marker 567 reader = createWALReaderForPrimary(); 568 569 LOG.info("-- Replaying edits and flush events in secondary"); 570 FlushDescriptor startFlushDesc = null; 571 FlushDescriptor commitFlushDesc = null; 572 573 int lastReplayed = 0; 574 while (true) { 575 System.out.println(lastReplayed); 576 WAL.Entry entry = reader.next(); 577 if (entry == null) { 578 break; 579 } 580 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 581 if (flushDesc != null) { 582 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 583 // don't replay the first flush start marker, hold on to it, replay the second one 584 if (startFlushDesc == null) { 585 startFlushDesc = flushDesc; 586 } else { 587 LOG.info("-- Replaying flush start in secondary"); 588 startFlushDesc = flushDesc; 589 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 590 assertNull(result.result); 591 } 592 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 593 // do not replay any flush commit yet 594 if (commitFlushDesc == null) { 595 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 596 } 597 } 598 // after replay verify that everything is still visible 599 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 600 } else { 601 lastReplayed = replayEdit(secondaryRegion, entry); 602 } 603 } 604 605 // at this point, there should be some data (rows 0-200) in memstore snapshot 606 // and some more data in memstores (rows 200-300) 607 verifyData(secondaryRegion, 0, numRows, cq, families); 608 609 // no store files in the region 610 int expectedStoreFileCount = 0; 611 for (HStore s : secondaryRegion.getStores()) { 612 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 613 } 614 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 615 616 // Test case 1: replay the a flush commit marker smaller than what we have prepared 617 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 618 + startFlushDesc); 619 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 620 621 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 622 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 623 624 // assert that the flush files are picked 625 expectedStoreFileCount++; 626 for (HStore s : secondaryRegion.getStores()) { 627 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 628 } 629 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 630 MemStoreSize mss = store.getFlushableSize(); 631 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 632 633 // assert that the region memstore is same as before 634 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 635 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 636 637 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 638 639 LOG.info("-- Verifying edits from secondary"); 640 verifyData(secondaryRegion, 0, numRows, cq, families); 641 642 LOG.info("-- Verifying edits from primary."); 643 verifyData(primaryRegion, 0, numRows, cq, families); 644 } 645 646 /** 647 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 648 * larger than the previous flush start marker. 649 */ 650 @Test 651 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 652 // load some data to primary and flush. 1 flush and some more unflushed data 653 putDataWithFlushes(primaryRegion, 100, 100, 100); 654 int numRows = 200; 655 656 // now replay the edits and the flush marker 657 reader = createWALReaderForPrimary(); 658 659 LOG.info("-- Replaying edits and flush events in secondary"); 660 FlushDescriptor startFlushDesc = null; 661 FlushDescriptor commitFlushDesc = null; 662 663 int lastReplayed = 0; 664 while (true) { 665 WAL.Entry entry = reader.next(); 666 if (entry == null) { 667 break; 668 } 669 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 670 if (flushDesc != null) { 671 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 672 if (startFlushDesc == null) { 673 LOG.info("-- Replaying flush start in secondary"); 674 startFlushDesc = flushDesc; 675 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 676 assertNull(result.result); 677 } 678 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 679 // do not replay any flush commit yet 680 // hold on to the flush commit marker but simulate a larger 681 // flush commit seqId 682 commitFlushDesc = FlushDescriptor.newBuilder(flushDesc) 683 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build(); 684 } 685 // after replay verify that everything is still visible 686 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 687 } else { 688 lastReplayed = replayEdit(secondaryRegion, entry); 689 } 690 } 691 692 // at this point, there should be some data (rows 0-100) in memstore snapshot 693 // and some more data in memstores (rows 100-200) 694 verifyData(secondaryRegion, 0, numRows, cq, families); 695 696 // no store files in the region 697 int expectedStoreFileCount = 0; 698 for (HStore s : secondaryRegion.getStores()) { 699 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 700 } 701 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 702 703 // Test case 1: replay the a flush commit marker larger than what we have prepared 704 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 705 + startFlushDesc); 706 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 707 708 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 709 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 710 711 // assert that the flush files are picked 712 expectedStoreFileCount++; 713 for (HStore s : secondaryRegion.getStores()) { 714 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 715 } 716 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 717 MemStoreSize mss = store.getFlushableSize(); 718 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 719 720 // assert that the region memstore is smaller than before, but not empty 721 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 722 assertTrue(newRegionMemstoreSize > 0); 723 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 724 725 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 726 727 LOG.info("-- Verifying edits from secondary"); 728 verifyData(secondaryRegion, 0, numRows, cq, families); 729 730 LOG.info("-- Verifying edits from primary."); 731 verifyData(primaryRegion, 0, numRows, cq, families); 732 } 733 734 /** 735 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 736 * memstore edits should be dropped after the flush commit replay since they should be in flushed 737 * files 738 */ 739 @Test 740 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 741 throws IOException { 742 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 743 } 744 745 /** 746 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 747 * memstore edits should be not dropped after the flush commit replay since not every edit will be 748 * in flushed files (based on seqId) 749 */ 750 @Test 751 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 752 throws IOException { 753 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 754 } 755 756 /** 757 * Tests the case where we receive a flush commit before receiving any flush prepare markers 758 */ 759 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 760 throws IOException { 761 // load some data to primary and flush. 1 flushes and some more unflushed data. 762 // write more data after flush depending on whether droppableSnapshot 763 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 764 int numRows = droppableMemstore ? 100 : 200; 765 766 // now replay the edits and the flush marker 767 reader = createWALReaderForPrimary(); 768 769 LOG.info("-- Replaying edits and flush events in secondary"); 770 FlushDescriptor commitFlushDesc = null; 771 772 int lastReplayed = 0; 773 while (true) { 774 WAL.Entry entry = reader.next(); 775 if (entry == null) { 776 break; 777 } 778 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 779 if (flushDesc != null) { 780 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 781 // do not replay flush start marker 782 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 783 commitFlushDesc = flushDesc; // hold on to the flush commit marker 784 } 785 // after replay verify that everything is still visible 786 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 787 } else { 788 lastReplayed = replayEdit(secondaryRegion, entry); 789 } 790 } 791 792 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 793 // and some more data in memstores (rows 100-300) 794 verifyData(secondaryRegion, 0, numRows, cq, families); 795 796 // no store files in the region 797 int expectedStoreFileCount = 0; 798 for (HStore s : secondaryRegion.getStores()) { 799 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 800 } 801 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 802 803 // Test case 1: replay a flush commit marker without start flush marker 804 assertNull(secondaryRegion.getPrepareFlushResult()); 805 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 806 807 // ensure all files are visible in secondary 808 for (HStore store : secondaryRegion.getStores()) { 809 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 810 } 811 812 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 813 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 814 815 // assert that the flush files are picked 816 expectedStoreFileCount++; 817 for (HStore s : secondaryRegion.getStores()) { 818 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 819 } 820 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 821 MemStoreSize mss = store.getFlushableSize(); 822 if (droppableMemstore) { 823 // assert that the memstore is dropped 824 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 825 } else { 826 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 827 } 828 829 // assert that the region memstore is same as before (we could not drop) 830 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 831 if (droppableMemstore) { 832 assertTrue(0 == newRegionMemstoreSize); 833 } else { 834 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 835 } 836 837 LOG.info("-- Verifying edits from secondary"); 838 verifyData(secondaryRegion, 0, numRows, cq, families); 839 840 LOG.info("-- Verifying edits from primary."); 841 verifyData(primaryRegion, 0, numRows, cq, families); 842 } 843 844 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 845 return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build(); 846 } 847 848 /** 849 * Tests replaying region open markers from primary region. Checks whether the files are picked up 850 */ 851 @Test 852 public void testReplayRegionOpenEvent() throws IOException { 853 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 854 int numRows = 100; 855 856 // close the region and open again. 857 primaryRegion.close(); 858 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 859 860 // now replay the edits and the flush marker 861 reader = createWALReaderForPrimary(); 862 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 863 864 LOG.info("-- Replaying edits and region events in secondary"); 865 while (true) { 866 WAL.Entry entry = reader.next(); 867 if (entry == null) { 868 break; 869 } 870 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 871 RegionEventDescriptor regionEventDesc = 872 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 873 874 if (flushDesc != null) { 875 // don't replay flush events 876 } else if (regionEventDesc != null) { 877 regionEvents.add(regionEventDesc); 878 } else { 879 // don't replay edits 880 } 881 } 882 883 // we should have 1 open, 1 close and 1 open event 884 assertEquals(3, regionEvents.size()); 885 886 // replay the first region open event. 887 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 888 889 // replay the close event as well 890 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 891 892 // no store files in the region 893 int expectedStoreFileCount = 0; 894 for (HStore s : secondaryRegion.getStores()) { 895 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 896 } 897 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 898 assertTrue(regionMemstoreSize == 0); 899 900 // now replay the region open event that should contain new file locations 901 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 902 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 903 904 // assert that the flush files are picked 905 expectedStoreFileCount++; 906 for (HStore s : secondaryRegion.getStores()) { 907 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 908 } 909 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 910 MemStoreSize mss = store.getFlushableSize(); 911 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 912 913 // assert that the region memstore is empty 914 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 915 assertTrue(newRegionMemstoreSize == 0); 916 917 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 918 // any 919 920 LOG.info("-- Verifying edits from secondary"); 921 verifyData(secondaryRegion, 0, numRows, cq, families); 922 923 LOG.info("-- Verifying edits from primary."); 924 verifyData(primaryRegion, 0, numRows, cq, families); 925 } 926 927 /** 928 * Tests the case where we replay a region open event after a flush start but before receiving 929 * flush commit 930 */ 931 @Test 932 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 933 putDataWithFlushes(primaryRegion, 100, 100, 100); 934 int numRows = 200; 935 936 // close the region and open again. 937 primaryRegion.close(); 938 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 939 940 // now replay the edits and the flush marker 941 reader = createWALReaderForPrimary(); 942 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 943 944 LOG.info("-- Replaying edits and region events in secondary"); 945 while (true) { 946 WAL.Entry entry = reader.next(); 947 if (entry == null) { 948 break; 949 } 950 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 951 RegionEventDescriptor regionEventDesc = 952 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 953 954 if (flushDesc != null) { 955 // only replay flush start 956 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 957 secondaryRegion.replayWALFlushStartMarker(flushDesc); 958 } 959 } else if (regionEventDesc != null) { 960 regionEvents.add(regionEventDesc); 961 } else { 962 replayEdit(secondaryRegion, entry); 963 } 964 } 965 966 // at this point, there should be some data (rows 0-100) in the memstore snapshot 967 // and some more data in memstores (rows 100-200) 968 verifyData(secondaryRegion, 0, numRows, cq, families); 969 970 // we should have 1 open, 1 close and 1 open event 971 assertEquals(3, regionEvents.size()); 972 973 // no store files in the region 974 int expectedStoreFileCount = 0; 975 for (HStore s : secondaryRegion.getStores()) { 976 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 977 } 978 979 // now replay the region open event that should contain new file locations 980 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 981 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 982 983 // assert that the flush files are picked 984 expectedStoreFileCount = 2; // two flushes happened 985 for (HStore s : secondaryRegion.getStores()) { 986 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 987 } 988 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 989 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 990 assertTrue(newSnapshotSize.getDataSize() == 0); 991 992 // assert that the region memstore is empty 993 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 994 assertTrue(newRegionMemstoreSize == 0); 995 996 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 997 // any 998 999 LOG.info("-- Verifying edits from secondary"); 1000 verifyData(secondaryRegion, 0, numRows, cq, families); 1001 1002 LOG.info("-- Verifying edits from primary."); 1003 verifyData(primaryRegion, 0, numRows, cq, families); 1004 } 1005 1006 /** 1007 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1008 * of the last replayed region open event. 1009 */ 1010 @Test 1011 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1012 putDataWithFlushes(primaryRegion, 100, 100, 0); 1013 int numRows = 100; 1014 1015 // close the region and open again. 1016 primaryRegion.close(); 1017 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1018 1019 // now replay the edits and the flush marker 1020 reader = createWALReaderForPrimary(); 1021 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1022 List<WAL.Entry> edits = Lists.newArrayList(); 1023 1024 LOG.info("-- Replaying edits and region events in secondary"); 1025 while (true) { 1026 WAL.Entry entry = reader.next(); 1027 if (entry == null) { 1028 break; 1029 } 1030 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1031 RegionEventDescriptor regionEventDesc = 1032 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1033 1034 if (flushDesc != null) { 1035 // don't replay flushes 1036 } else if (regionEventDesc != null) { 1037 regionEvents.add(regionEventDesc); 1038 } else { 1039 edits.add(entry); 1040 } 1041 } 1042 1043 // replay the region open of first open, but with the seqid of the second open 1044 // this way non of the flush files will be picked up. 1045 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0)) 1046 .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build()); 1047 1048 // replay edits from the before region close. If replay does not 1049 // skip these the following verification will NOT fail. 1050 for (WAL.Entry entry : edits) { 1051 replayEdit(secondaryRegion, entry); 1052 } 1053 1054 boolean expectedFail = false; 1055 try { 1056 verifyData(secondaryRegion, 0, numRows, cq, families); 1057 } catch (AssertionError e) { 1058 expectedFail = true; // expected 1059 } 1060 if (!expectedFail) { 1061 fail("Should have failed this verification"); 1062 } 1063 } 1064 1065 @Test 1066 public void testReplayFlushSeqIds() throws IOException { 1067 // load some data to primary and flush 1068 int start = 0; 1069 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 1070 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1071 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1072 primaryRegion.flush(true); 1073 1074 // now replay the flush marker 1075 reader = createWALReaderForPrimary(); 1076 1077 long flushSeqId = -1; 1078 LOG.info("-- Replaying flush events in secondary"); 1079 while (true) { 1080 WAL.Entry entry = reader.next(); 1081 if (entry == null) { 1082 break; 1083 } 1084 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1085 if (flushDesc != null) { 1086 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1087 LOG.info("-- Replaying flush start in secondary"); 1088 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1089 flushSeqId = flushDesc.getFlushSequenceNumber(); 1090 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1091 LOG.info("-- Replaying flush commit in secondary"); 1092 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1093 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1094 } 1095 } 1096 // else do not replay 1097 } 1098 1099 // TODO: what to do with this? 1100 // assert that the newly picked up flush file is visible 1101 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1102 assertEquals(flushSeqId, readPoint); 1103 1104 // after replay verify that everything is still visible 1105 verifyData(secondaryRegion, 0, 100, cq, families); 1106 } 1107 1108 @Test 1109 public void testSeqIdsFromReplay() throws IOException { 1110 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1111 // original seqIds and they are made visible through mvcc read point upon replay 1112 String method = name; 1113 byte[] tableName = Bytes.toBytes(method); 1114 byte[] family = Bytes.toBytes("family"); 1115 1116 HRegion region = initHRegion(tableName, family); 1117 try { 1118 // replay an entry that is bigger than current read point 1119 long readPoint = region.getMVCC().getReadPoint(); 1120 long origSeqId = readPoint + 100; 1121 1122 Put put = new Put(row).addColumn(family, row, row); 1123 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1124 replay(region, put, origSeqId); 1125 1126 // read point should have advanced to this seqId 1127 assertGet(region, family, row); 1128 1129 // region seqId should have advanced at least to this seqId 1130 assertEquals(origSeqId, region.getReadPoint(null)); 1131 1132 // replay an entry that is smaller than current read point 1133 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1134 // replay does not allow reads while replay is going on. 1135 put = new Put(row2).addColumn(family, row2, row2); 1136 put.setDurability(Durability.SKIP_WAL); 1137 replay(region, put, origSeqId - 50); 1138 1139 assertGet(region, family, row2); 1140 } finally { 1141 region.close(); 1142 } 1143 } 1144 1145 /** 1146 * Tests that a region opened in secondary mode would not write region open / close events to its 1147 * WAL. 1148 */ 1149 @Test 1150 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1151 secondaryRegion.close(); 1152 walSecondary = spy(walSecondary); 1153 1154 // test for region open and close 1155 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1156 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1157 any(WALEdit.class)); 1158 1159 // test for replay prepare flush 1160 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1161 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder() 1162 .setFlushSequenceNumber(10) 1163 .setTableName(UnsafeByteOperations 1164 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1165 .setAction(FlushAction.START_FLUSH) 1166 .setEncodedRegionName( 1167 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1168 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1169 .build()); 1170 1171 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1172 any(WALEdit.class)); 1173 1174 secondaryRegion.close(); 1175 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1176 any(WALEdit.class)); 1177 } 1178 1179 /** 1180 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1181 */ 1182 @Test 1183 public void testRegionReadsEnabledFlag() throws IOException { 1184 1185 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1186 1187 verifyData(secondaryRegion, 0, 100, cq, families); 1188 1189 // now disable reads 1190 secondaryRegion.setReadsEnabled(false); 1191 try { 1192 verifyData(secondaryRegion, 0, 100, cq, families); 1193 fail("Should have failed with IOException"); 1194 } catch (IOException ex) { 1195 // expected 1196 } 1197 1198 // verify that we can still replay data 1199 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1200 1201 // now enable reads again 1202 secondaryRegion.setReadsEnabled(true); 1203 verifyData(secondaryRegion, 0, 200, cq, families); 1204 } 1205 1206 /** 1207 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1208 * It should write the flush request marker instead. 1209 */ 1210 @Test 1211 public void testWriteFlushRequestMarker() throws IOException { 1212 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1213 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1214 assertNotNull(result); 1215 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1216 assertFalse(result.wroteFlushWalMarker); 1217 1218 // request flush again, but this time with writeFlushRequestWalMarker = true 1219 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1220 assertNotNull(result); 1221 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1222 assertTrue(result.wroteFlushWalMarker); 1223 1224 List<FlushDescriptor> flushes = Lists.newArrayList(); 1225 reader = createWALReaderForPrimary(); 1226 while (true) { 1227 WAL.Entry entry = reader.next(); 1228 if (entry == null) { 1229 break; 1230 } 1231 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1232 if (flush != null) { 1233 flushes.add(flush); 1234 } 1235 } 1236 1237 assertEquals(1, flushes.size()); 1238 assertNotNull(flushes.get(0)); 1239 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1240 } 1241 1242 /** 1243 * Test the case where the secondary region replica is not in reads enabled state because it is 1244 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush 1245 * marker entry should restore the reads enabled status in the region and allow the reads to 1246 * continue. 1247 */ 1248 @Test 1249 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1250 disableReads(secondaryRegion); 1251 1252 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1253 // triggered flush restores readsEnabled 1254 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1255 reader = createWALReaderForPrimary(); 1256 while (true) { 1257 WAL.Entry entry = reader.next(); 1258 if (entry == null) { 1259 break; 1260 } 1261 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1262 if (flush != null) { 1263 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1264 } 1265 } 1266 1267 // now reads should be enabled 1268 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1269 } 1270 1271 /** 1272 * Test the case where the secondary region replica is not in reads enabled state because it is 1273 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1274 * entries should restore the reads enabled status in the region and allow the reads to continue. 1275 */ 1276 @Test 1277 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1278 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1279 // from triggered flush restores readsEnabled 1280 disableReads(secondaryRegion); 1281 1282 // put some data in primary 1283 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1284 primaryRegion.flush(true); 1285 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1286 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1287 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1288 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1289 // but can't figure it... and this is only test that seems to suffer this flush issue. 1290 // St.Ack 20160201 1291 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1292 1293 reader = createWALReaderForPrimary(); 1294 while (true) { 1295 WAL.Entry entry = reader.next(); 1296 LOG.info(Objects.toString(entry)); 1297 if (entry == null) { 1298 break; 1299 } 1300 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1301 if (flush != null) { 1302 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1303 } else { 1304 replayEdit(secondaryRegion, entry); 1305 } 1306 } 1307 1308 // now reads should be enabled 1309 verifyData(secondaryRegion, 0, 100, cq, families); 1310 } 1311 1312 /** 1313 * Test the case where the secondary region replica is not in reads enabled state because it is 1314 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1315 * entries should restore the reads enabled status in the region and allow the reads to continue. 1316 */ 1317 @Test 1318 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1319 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1320 // from triggered flush restores readsEnabled 1321 disableReads(secondaryRegion); 1322 1323 // put some data in primary 1324 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1325 primaryRegion.flush(true); 1326 1327 reader = createWALReaderForPrimary(); 1328 while (true) { 1329 WAL.Entry entry = reader.next(); 1330 if (entry == null) { 1331 break; 1332 } 1333 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1334 if (flush != null) { 1335 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1336 } 1337 } 1338 1339 // now reads should be enabled 1340 verifyData(secondaryRegion, 0, 100, cq, families); 1341 } 1342 1343 /** 1344 * Test the case where the secondary region replica is not in reads enabled state because it is 1345 * waiting for a flush or region open marker from primary region. Replaying region open event 1346 * entry from primary should restore the reads enabled status in the region and allow the reads to 1347 * continue. 1348 */ 1349 @Test 1350 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1351 // Test case 3: Test that replaying region open event markers restores readsEnabled 1352 disableReads(secondaryRegion); 1353 1354 primaryRegion.close(); 1355 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1356 1357 reader = createWALReaderForPrimary(); 1358 while (true) { 1359 WAL.Entry entry = reader.next(); 1360 if (entry == null) { 1361 break; 1362 } 1363 1364 RegionEventDescriptor regionEventDesc = 1365 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1366 1367 if (regionEventDesc != null) { 1368 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1369 } 1370 } 1371 1372 // now reads should be enabled 1373 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1374 } 1375 1376 @Test 1377 public void testRefresStoreFiles() throws IOException { 1378 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1379 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1380 1381 // Test case 1: refresh with an empty region 1382 secondaryRegion.refreshStoreFiles(); 1383 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1384 1385 // do one flush 1386 putDataWithFlushes(primaryRegion, 100, 100, 0); 1387 int numRows = 100; 1388 1389 // refresh the store file list, and ensure that the files are picked up. 1390 secondaryRegion.refreshStoreFiles(); 1391 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1392 secondaryRegion.getStoreFileList(families)); 1393 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1394 1395 LOG.info("-- Verifying edits from secondary"); 1396 verifyData(secondaryRegion, 0, numRows, cq, families); 1397 1398 // Test case 2: 3 some more flushes 1399 putDataWithFlushes(primaryRegion, 100, 300, 0); 1400 numRows = 300; 1401 1402 // refresh the store file list, and ensure that the files are picked up. 1403 secondaryRegion.refreshStoreFiles(); 1404 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1405 secondaryRegion.getStoreFileList(families)); 1406 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1407 1408 LOG.info("-- Verifying edits from secondary"); 1409 verifyData(secondaryRegion, 0, numRows, cq, families); 1410 1411 if (FSUtils.WINDOWS) { 1412 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1413 return; 1414 } 1415 1416 // Test case 3: compact primary files 1417 primaryRegion.compactStores(); 1418 List<HRegion> regions = new ArrayList<>(); 1419 regions.add(primaryRegion); 1420 Mockito.doReturn(regions).when(rss).getRegions(); 1421 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1422 cleaner.chore(); 1423 secondaryRegion.refreshStoreFiles(); 1424 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1425 secondaryRegion.getStoreFileList(families)); 1426 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1427 1428 LOG.info("-- Verifying edits from secondary"); 1429 verifyData(secondaryRegion, 0, numRows, cq, families); 1430 1431 LOG.info("-- Replaying edits in secondary"); 1432 1433 // Test case 4: replay some edits, ensure that memstore is dropped. 1434 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1435 putDataWithFlushes(primaryRegion, 400, 400, 0); 1436 numRows = 400; 1437 1438 reader = createWALReaderForPrimary(); 1439 while (true) { 1440 WAL.Entry entry = reader.next(); 1441 if (entry == null) { 1442 break; 1443 } 1444 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1445 if (flush != null) { 1446 // do not replay flush 1447 } else { 1448 replayEdit(secondaryRegion, entry); 1449 } 1450 } 1451 1452 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1453 1454 secondaryRegion.refreshStoreFiles(); 1455 1456 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1457 1458 LOG.info("-- Verifying edits from primary"); 1459 verifyData(primaryRegion, 0, numRows, cq, families); 1460 LOG.info("-- Verifying edits from secondary"); 1461 verifyData(secondaryRegion, 0, numRows, cq, families); 1462 } 1463 1464 /** 1465 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1466 */ 1467 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1468 List<Path> l1 = new ArrayList<>(list1.size()); 1469 for (String path : list1) { 1470 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1471 } 1472 List<Path> l2 = new ArrayList<>(list2.size()); 1473 for (String path : list2) { 1474 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1475 } 1476 assertEquals(l1, l2); 1477 } 1478 1479 private void disableReads(HRegion region) { 1480 region.setReadsEnabled(false); 1481 try { 1482 verifyData(region, 0, 1, cq, families); 1483 fail("Should have failed with IOException"); 1484 } catch (IOException ex) { 1485 // expected 1486 } 1487 } 1488 1489 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1490 put.setDurability(Durability.SKIP_WAL); 1491 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1492 region.batchReplay(new MutationReplay[] { mutation }, replaySeqId); 1493 } 1494 1495 /** 1496 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1497 */ 1498 @Test 1499 public void testReplayBulkLoadEvent() throws IOException { 1500 LOG.info("testReplayBulkLoadEvent starts"); 1501 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1502 1503 // close the region and open again. 1504 primaryRegion.close(); 1505 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1506 1507 // bulk load a file into primary region 1508 byte[] randomValues = new byte[20]; 1509 Bytes.random(randomValues); 1510 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1511 1512 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1513 int expectedLoadFileCount = 0; 1514 for (byte[] family : families) { 1515 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1516 expectedLoadFileCount++; 1517 } 1518 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1519 1520 // now replay the edits and the bulk load marker 1521 reader = createWALReaderForPrimary(); 1522 1523 LOG.info("-- Replaying edits and region events in secondary"); 1524 BulkLoadDescriptor bulkloadEvent = null; 1525 while (true) { 1526 WAL.Entry entry = reader.next(); 1527 if (entry == null) { 1528 break; 1529 } 1530 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1531 if (bulkloadEvent != null) { 1532 break; 1533 } 1534 } 1535 1536 // we should have 1 bulk load event 1537 assertTrue(bulkloadEvent != null); 1538 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1539 1540 // replay the bulk load event 1541 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1542 1543 List<String> storeFileNames = new ArrayList<>(); 1544 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1545 storeFileNames.addAll(storeDesc.getStoreFileList()); 1546 } 1547 // assert that the bulk loaded files are picked 1548 for (HStore s : secondaryRegion.getStores()) { 1549 for (HStoreFile sf : s.getStorefiles()) { 1550 storeFileNames.remove(sf.getPath().getName()); 1551 } 1552 } 1553 assertTrue(storeFileNames.isEmpty(), "Found some store file isn't loaded:" + storeFileNames); 1554 1555 LOG.info("-- Verifying edits from secondary"); 1556 for (byte[] family : families) { 1557 assertGet(secondaryRegion, family, randomValues); 1558 } 1559 } 1560 1561 @Test 1562 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1563 // tests replaying flush commit marker, but the flush file has already been compacted 1564 // from primary and also deleted from the archive directory 1565 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder() 1566 .setFlushSequenceNumber(Long.MAX_VALUE) 1567 .setTableName(UnsafeByteOperations 1568 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1569 .setAction(FlushAction.COMMIT_FLUSH) 1570 .setEncodedRegionName( 1571 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1572 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1573 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1574 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1575 .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build()) 1576 .build()); 1577 } 1578 1579 @Test 1580 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1581 // tests replaying compaction marker, but the compaction output file has already been compacted 1582 // from primary and also deleted from the archive directory 1583 secondaryRegion 1584 .replayWALCompactionMarker( 1585 CompactionDescriptor.newBuilder() 1586 .setTableName(UnsafeByteOperations 1587 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1588 .setEncodedRegionName( 1589 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1590 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123") 1591 .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir") 1592 .setRegionName( 1593 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1594 .build(), 1595 true, true, Long.MAX_VALUE); 1596 } 1597 1598 @Test 1599 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1600 // tests replaying region open event marker, but the region files have already been compacted 1601 // from primary and also deleted from the archive directory 1602 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1603 .setTableName(UnsafeByteOperations 1604 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1605 .setEncodedRegionName( 1606 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1607 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1608 .setEventType(EventType.REGION_OPEN) 1609 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1610 .setLogSequenceNumber(Long.MAX_VALUE) 1611 .addStores( 1612 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1613 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1614 .build()); 1615 } 1616 1617 @Test 1618 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1619 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1620 // from primary and also deleted from the archive directory 1621 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1622 .setTableName( 1623 ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1624 .setEncodedRegionName( 1625 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1626 .setBulkloadSeqNum(Long.MAX_VALUE) 1627 .addStores( 1628 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1629 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1630 .build()); 1631 } 1632 1633 private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes) 1634 throws IOException { 1635 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1636 // TODO We need a way to do this without creating files 1637 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); 1638 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1639 try { 1640 hFileFactory.withOutputStream(out); 1641 hFileFactory.withFileContext(new HFileContextBuilder().build()); 1642 HFile.Writer writer = hFileFactory.create(); 1643 try { 1644 writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) 1645 .setRow(valueBytes).setFamily(family).setQualifier(valueBytes).setTimestamp(0L) 1646 .setType(KeyValue.Type.Put.getCode()).setValue(valueBytes).build())); 1647 } finally { 1648 writer.close(); 1649 } 1650 } finally { 1651 out.close(); 1652 } 1653 return testFile.toString(); 1654 } 1655 1656 /** 1657 * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush 1658 * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but 1659 * does not execute flush after 1660 */ 1661 private void putDataWithFlushes(HRegion region, int flushInterval, int numRows, 1662 int numRowsAfterFlush) throws IOException { 1663 int start = 0; 1664 for (; start < numRows; start += flushInterval) { 1665 LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval)); 1666 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1667 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1668 region.flush(true); 1669 } 1670 LOG.info("-- Writing some more data to primary, not flushing"); 1671 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1672 } 1673 1674 private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf, 1675 byte[]... families) throws IOException { 1676 for (int i = startRow; i < startRow + numRows; i++) { 1677 Put put = new Put(Bytes.toBytes("" + i)); 1678 put.setDurability(Durability.SKIP_WAL); 1679 for (byte[] family : families) { 1680 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1681 } 1682 replay(region, put, i + 1); 1683 } 1684 } 1685 1686 private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException { 1687 return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, 1688 HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families); 1689 } 1690}