001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.FileNotFoundException; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.Map; 041import java.util.Objects; 042import java.util.Random; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataOutputStream; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseTestingUtility; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Get; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.RegionInfo; 059import org.apache.hadoop.hbase.client.RegionInfoBuilder; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.executor.ExecutorService; 063import org.apache.hadoop.hbase.io.hfile.HFile; 064import org.apache.hadoop.hbase.io.hfile.HFileContext; 065import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 066import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 067import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 072import org.apache.hadoop.hbase.util.FSUtils; 073import org.apache.hadoop.hbase.util.Pair; 074import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 075import org.apache.hadoop.hbase.wal.WAL; 076import org.apache.hadoop.hbase.wal.WALEdit; 077import org.apache.hadoop.hbase.wal.WALFactory; 078import org.apache.hadoop.hbase.wal.WALKeyImpl; 079import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; 080import org.apache.hadoop.util.StringUtils; 081import org.junit.After; 082import org.junit.AfterClass; 083import org.junit.Before; 084import org.junit.BeforeClass; 085import org.junit.ClassRule; 086import org.junit.Rule; 087import org.junit.Test; 088import org.junit.experimental.categories.Category; 089import org.junit.rules.TestName; 090import org.mockito.Mockito; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 095import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 096 097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 107 108/** 109 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 110 * region replicas 111 */ 112@Category(LargeTests.class) 113public class TestHRegionReplayEvents { 114 115 @ClassRule 116 public static final HBaseClassTestRule CLASS_RULE = 117 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); 118 119 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 120 @Rule public TestName name = new TestName(); 121 122 private static HBaseTestingUtility TEST_UTIL; 123 124 public static Configuration CONF; 125 private String dir; 126 127 private byte[][] families = new byte[][] { 128 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")}; 129 130 // Test names 131 protected byte[] tableName; 132 protected String method; 133 protected final byte[] row = Bytes.toBytes("rowA"); 134 protected final byte[] row2 = Bytes.toBytes("rowB"); 135 protected byte[] cq = Bytes.toBytes("cq"); 136 137 // per test fields 138 private Path rootDir; 139 private TableDescriptor htd; 140 private RegionServerServices rss; 141 private RegionInfo primaryHri, secondaryHri; 142 private HRegion primaryRegion, secondaryRegion; 143 private WAL walPrimary, walSecondary; 144 private WAL.Reader reader; 145 146 @BeforeClass 147 public static void setUpBeforeClass() throws Exception { 148 TEST_UTIL = new HBaseTestingUtility(); 149 TEST_UTIL.startMiniDFSCluster(1); 150 } 151 152 @AfterClass 153 public static void tearDownAfterClass() throws Exception { 154 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 155 TEST_UTIL.cleanupTestDir(); 156 TEST_UTIL.shutdownMiniDFSCluster(); 157 } 158 159 @Before 160 public void setUp() throws Exception { 161 CONF = TEST_UTIL.getConfiguration(); 162 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 163 method = name.getMethodName(); 164 tableName = Bytes.toBytes(name.getMethodName()); 165 rootDir = new Path(dir + method); 166 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 167 method = name.getMethodName(); 168 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 169 for (byte[] family : families) { 170 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 171 } 172 htd = builder.build(); 173 174 long time = System.currentTimeMillis(); 175 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 176 primaryHri = 177 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 178 secondaryHri = 179 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 180 181 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); 182 walPrimary = wals.getWAL(primaryHri); 183 walSecondary = wals.getWAL(secondaryHri); 184 185 rss = mock(RegionServerServices.class); 186 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 187 when(rss.getConfiguration()).thenReturn(CONF); 188 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 189 String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER 190 .toString(); 191 ExecutorService es = new ExecutorService(string); 192 es.startExecutorService( 193 string+"-"+string, 1); 194 when(rss.getExecutorService()).thenReturn(es); 195 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 196 primaryRegion.close(); 197 List<HRegion> regions = new ArrayList<>(); 198 regions.add(primaryRegion); 199 Mockito.doReturn(regions).when(rss).getRegions(); 200 201 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 202 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 203 204 reader = null; 205 } 206 207 @After 208 public void tearDown() throws Exception { 209 if (reader != null) { 210 reader.close(); 211 } 212 213 if (primaryRegion != null) { 214 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 215 } 216 if (secondaryRegion != null) { 217 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 218 } 219 220 EnvironmentEdgeManagerTestHelper.reset(); 221 } 222 223 String getName() { 224 return name.getMethodName(); 225 } 226 227 // Some of the test cases are as follows: 228 // 1. replay flush start marker again 229 // 2. replay flush with smaller seqId than what is there in memstore snapshot 230 // 3. replay flush with larger seqId than what is there in memstore snapshot 231 // 4. replay flush commit without flush prepare. non droppable memstore 232 // 5. replay flush commit without flush prepare. droppable memstore 233 // 6. replay open region event 234 // 7. replay open region event after flush start 235 // 8. replay flush form an earlier seqId (test ignoring seqIds) 236 // 9. start flush does not prevent region from closing. 237 238 @Test 239 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 240 // load some data and flush ensure that the secondary replica will not execute the flush 241 242 // load some data to secondary by replaying 243 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 244 245 verifyData(secondaryRegion, 0, 1000, cq, families); 246 247 // flush region 248 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true); 249 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 250 251 verifyData(secondaryRegion, 0, 1000, cq, families); 252 253 // close the region, and inspect that it has not flushed 254 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 255 // assert that there are no files (due to flush) 256 for (List<HStoreFile> f : files.values()) { 257 assertTrue(f.isEmpty()); 258 } 259 } 260 261 /** 262 * Tests a case where we replay only a flush start marker, then the region is closed. This region 263 * should not block indefinitely 264 */ 265 @Test 266 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 267 // load some data to primary and flush 268 int start = 0; 269 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); 270 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 271 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 272 primaryRegion.flush(true); 273 274 // now replay the edits and the flush marker 275 reader = createWALReaderForPrimary(); 276 277 LOG.info("-- Replaying edits and flush events in secondary"); 278 while (true) { 279 WAL.Entry entry = reader.next(); 280 if (entry == null) { 281 break; 282 } 283 FlushDescriptor flushDesc 284 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 285 if (flushDesc != null) { 286 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 287 LOG.info("-- Replaying flush start in secondary"); 288 secondaryRegion.replayWALFlushStartMarker(flushDesc); 289 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 290 LOG.info("-- NOT Replaying flush commit in secondary"); 291 } 292 } else { 293 replayEdit(secondaryRegion, entry); 294 } 295 } 296 297 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 298 // now close the region which should not cause hold because of un-committed flush 299 secondaryRegion.close(); 300 301 // verify that the memstore size is back to what it was 302 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 303 } 304 305 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 306 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 307 return 0; // handled elsewhere 308 } 309 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 310 for (Cell cell : entry.getEdit().getCells()) put.add(cell); 311 put.setDurability(Durability.SKIP_WAL); 312 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 313 region.batchReplay(new MutationReplay[] {mutation}, 314 entry.getKey().getSequenceId()); 315 return Integer.parseInt(Bytes.toString(put.getRow())); 316 } 317 318 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { 319 return WALFactory.createReader(TEST_UTIL.getTestFileSystem(), 320 AbstractFSWALProvider.getCurrentFileName(walPrimary), 321 TEST_UTIL.getConfiguration()); 322 } 323 324 @Test 325 public void testBatchReplayWithMultipleNonces() throws IOException { 326 try { 327 MutationReplay[] mutations = new MutationReplay[100]; 328 for (int i = 0; i < 100; i++) { 329 Put put = new Put(Bytes.toBytes(i)); 330 put.setDurability(Durability.SYNC_WAL); 331 for (byte[] familly : this.families) { 332 put.addColumn(familly, this.cq, null); 333 long nonceNum = i / 10; 334 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 335 } 336 } 337 primaryRegion.batchReplay(mutations, 20); 338 } catch (Exception e) { 339 String msg = "Error while replay of batch with multiple nonces. "; 340 LOG.error(msg, e); 341 fail(msg + e.getMessage()); 342 } 343 } 344 345 @Test 346 public void testReplayFlushesAndCompactions() throws IOException { 347 // initiate a secondary region with some data. 348 349 // load some data to primary and flush. 3 flushes and some more unflushed data 350 putDataWithFlushes(primaryRegion, 100, 300, 100); 351 352 // compaction from primary 353 LOG.info("-- Compacting primary, only 1 store"); 354 primaryRegion.compactStore(Bytes.toBytes("cf1"), 355 NoLimitThroughputController.INSTANCE); 356 357 // now replay the edits and the flush marker 358 reader = createWALReaderForPrimary(); 359 360 LOG.info("-- Replaying edits and flush events in secondary"); 361 int lastReplayed = 0; 362 int expectedStoreFileCount = 0; 363 while (true) { 364 WAL.Entry entry = reader.next(); 365 if (entry == null) { 366 break; 367 } 368 FlushDescriptor flushDesc 369 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 370 CompactionDescriptor compactionDesc 371 = WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 372 if (flushDesc != null) { 373 // first verify that everything is replayed and visible before flush event replay 374 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 375 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 376 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 377 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 378 MemStoreSize mss = store.getFlushableSize(); 379 long storeSize = store.getSize(); 380 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 381 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 382 LOG.info("-- Replaying flush start in secondary"); 383 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 384 assertNull(result.result); 385 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 386 387 // assert that the store memstore is smaller now 388 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 389 LOG.info("Memstore size reduced by:" 390 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 391 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 392 393 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 394 LOG.info("-- Replaying flush commit in secondary"); 395 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 396 397 // assert that the flush files are picked 398 expectedStoreFileCount++; 399 for (HStore s : secondaryRegion.getStores()) { 400 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 401 } 402 MemStoreSize newMss = store.getFlushableSize(); 403 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 404 405 // assert that the region memstore is smaller now 406 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 407 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 408 409 // assert that the store sizes are bigger 410 assertTrue(store.getSize() > storeSize); 411 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 412 assertEquals(store.getSize(), store.getStorefilesSize()); 413 } 414 // after replay verify that everything is still visible 415 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 416 } else if (compactionDesc != null) { 417 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 418 419 // assert that the compaction is applied 420 for (HStore store : secondaryRegion.getStores()) { 421 if (store.getColumnFamilyName().equals("cf1")) { 422 assertEquals(1, store.getStorefilesCount()); 423 } else { 424 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 425 } 426 } 427 } else { 428 lastReplayed = replayEdit(secondaryRegion, entry);; 429 } 430 } 431 432 assertEquals(400-1, lastReplayed); 433 LOG.info("-- Verifying edits from secondary"); 434 verifyData(secondaryRegion, 0, 400, cq, families); 435 436 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 437 verifyData(primaryRegion, 0, lastReplayed, cq, families); 438 for (HStore store : primaryRegion.getStores()) { 439 if (store.getColumnFamilyName().equals("cf1")) { 440 assertEquals(1, store.getStorefilesCount()); 441 } else { 442 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 443 } 444 } 445 } 446 447 /** 448 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 449 * equal to, greater or less than the previous flush start marker. 450 */ 451 @Test 452 public void testReplayFlushStartMarkers() throws IOException { 453 // load some data to primary and flush. 1 flush and some more unflushed data 454 putDataWithFlushes(primaryRegion, 100, 100, 100); 455 int numRows = 200; 456 457 // now replay the edits and the flush marker 458 reader = createWALReaderForPrimary(); 459 460 LOG.info("-- Replaying edits and flush events in secondary"); 461 462 FlushDescriptor startFlushDesc = null; 463 464 int lastReplayed = 0; 465 while (true) { 466 WAL.Entry entry = reader.next(); 467 if (entry == null) { 468 break; 469 } 470 FlushDescriptor flushDesc 471 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 472 if (flushDesc != null) { 473 // first verify that everything is replayed and visible before flush event replay 474 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 475 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 476 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 477 MemStoreSize mss = store.getFlushableSize(); 478 479 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 480 startFlushDesc = flushDesc; 481 LOG.info("-- Replaying flush start in secondary"); 482 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 483 assertNull(result.result); 484 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 485 assertTrue(regionMemstoreSize > 0); 486 assertTrue(mss.getHeapSize() > 0); 487 488 // assert that the store memstore is smaller now 489 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 490 LOG.info("Memstore size reduced by:" 491 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 492 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 493 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 494 495 } 496 // after replay verify that everything is still visible 497 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 498 } else { 499 lastReplayed = replayEdit(secondaryRegion, entry); 500 } 501 } 502 503 // at this point, there should be some data (rows 0-100) in memstore snapshot 504 // and some more data in memstores (rows 100-200) 505 506 verifyData(secondaryRegion, 0, numRows, cq, families); 507 508 // Test case 1: replay the same flush start marker again 509 LOG.info("-- Replaying same flush start in secondary again"); 510 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 511 assertNull(result); // this should return null. Ignoring the flush start marker 512 // assert that we still have prepared flush with the previous setup. 513 assertNotNull(secondaryRegion.getPrepareFlushResult()); 514 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 515 startFlushDesc.getFlushSequenceNumber()); 516 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 517 verifyData(secondaryRegion, 0, numRows, cq, families); 518 519 // Test case 2: replay a flush start marker with a smaller seqId 520 FlushDescriptor startFlushDescSmallerSeqId 521 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 522 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 523 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 524 assertNull(result); // this should return null. Ignoring the flush start marker 525 // assert that we still have prepared flush with the previous setup. 526 assertNotNull(secondaryRegion.getPrepareFlushResult()); 527 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 528 startFlushDesc.getFlushSequenceNumber()); 529 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 530 verifyData(secondaryRegion, 0, numRows, cq, families); 531 532 // Test case 3: replay a flush start marker with a larger seqId 533 FlushDescriptor startFlushDescLargerSeqId 534 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 535 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 536 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 537 assertNull(result); // this should return null. Ignoring the flush start marker 538 // assert that we still have prepared flush with the previous setup. 539 assertNotNull(secondaryRegion.getPrepareFlushResult()); 540 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 541 startFlushDesc.getFlushSequenceNumber()); 542 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 543 verifyData(secondaryRegion, 0, numRows, cq, families); 544 545 LOG.info("-- Verifying edits from secondary"); 546 verifyData(secondaryRegion, 0, numRows, cq, families); 547 548 LOG.info("-- Verifying edits from primary."); 549 verifyData(primaryRegion, 0, numRows, cq, families); 550 } 551 552 /** 553 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 554 * less than the previous flush start marker. 555 */ 556 @Test 557 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 558 // load some data to primary and flush. 2 flushes and some more unflushed data 559 putDataWithFlushes(primaryRegion, 100, 200, 100); 560 int numRows = 300; 561 562 // now replay the edits and the flush marker 563 reader = createWALReaderForPrimary(); 564 565 LOG.info("-- Replaying edits and flush events in secondary"); 566 FlushDescriptor startFlushDesc = null; 567 FlushDescriptor commitFlushDesc = null; 568 569 int lastReplayed = 0; 570 while (true) { 571 System.out.println(lastReplayed); 572 WAL.Entry entry = reader.next(); 573 if (entry == null) { 574 break; 575 } 576 FlushDescriptor flushDesc 577 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 578 if (flushDesc != null) { 579 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 580 // don't replay the first flush start marker, hold on to it, replay the second one 581 if (startFlushDesc == null) { 582 startFlushDesc = flushDesc; 583 } else { 584 LOG.info("-- Replaying flush start in secondary"); 585 startFlushDesc = flushDesc; 586 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 587 assertNull(result.result); 588 } 589 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 590 // do not replay any flush commit yet 591 if (commitFlushDesc == null) { 592 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 593 } 594 } 595 // after replay verify that everything is still visible 596 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 597 } else { 598 lastReplayed = replayEdit(secondaryRegion, entry); 599 } 600 } 601 602 // at this point, there should be some data (rows 0-200) in memstore snapshot 603 // and some more data in memstores (rows 200-300) 604 verifyData(secondaryRegion, 0, numRows, cq, families); 605 606 // no store files in the region 607 int expectedStoreFileCount = 0; 608 for (HStore s : secondaryRegion.getStores()) { 609 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 610 } 611 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 612 613 // Test case 1: replay the a flush commit marker smaller than what we have prepared 614 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 615 + startFlushDesc); 616 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 617 618 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 619 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 620 621 // assert that the flush files are picked 622 expectedStoreFileCount++; 623 for (HStore s : secondaryRegion.getStores()) { 624 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 625 } 626 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 627 MemStoreSize mss = store.getFlushableSize(); 628 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 629 630 // assert that the region memstore is same as before 631 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 632 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 633 634 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 635 636 LOG.info("-- Verifying edits from secondary"); 637 verifyData(secondaryRegion, 0, numRows, cq, families); 638 639 LOG.info("-- Verifying edits from primary."); 640 verifyData(primaryRegion, 0, numRows, cq, families); 641 } 642 643 /** 644 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 645 * larger than the previous flush start marker. 646 */ 647 @Test 648 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 649 // load some data to primary and flush. 1 flush and some more unflushed data 650 putDataWithFlushes(primaryRegion, 100, 100, 100); 651 int numRows = 200; 652 653 // now replay the edits and the flush marker 654 reader = createWALReaderForPrimary(); 655 656 LOG.info("-- Replaying edits and flush events in secondary"); 657 FlushDescriptor startFlushDesc = null; 658 FlushDescriptor commitFlushDesc = null; 659 660 int lastReplayed = 0; 661 while (true) { 662 WAL.Entry entry = reader.next(); 663 if (entry == null) { 664 break; 665 } 666 FlushDescriptor flushDesc 667 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 668 if (flushDesc != null) { 669 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 670 if (startFlushDesc == null) { 671 LOG.info("-- Replaying flush start in secondary"); 672 startFlushDesc = flushDesc; 673 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 674 assertNull(result.result); 675 } 676 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 677 // do not replay any flush commit yet 678 // hold on to the flush commit marker but simulate a larger 679 // flush commit seqId 680 commitFlushDesc = 681 FlushDescriptor.newBuilder(flushDesc) 682 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50) 683 .build(); 684 } 685 // after replay verify that everything is still visible 686 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 687 } else { 688 lastReplayed = replayEdit(secondaryRegion, entry); 689 } 690 } 691 692 // at this point, there should be some data (rows 0-100) in memstore snapshot 693 // and some more data in memstores (rows 100-200) 694 verifyData(secondaryRegion, 0, numRows, cq, families); 695 696 // no store files in the region 697 int expectedStoreFileCount = 0; 698 for (HStore s : secondaryRegion.getStores()) { 699 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 700 } 701 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 702 703 // Test case 1: replay the a flush commit marker larger than what we have prepared 704 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 705 + startFlushDesc); 706 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 707 708 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 709 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 710 711 // assert that the flush files are picked 712 expectedStoreFileCount++; 713 for (HStore s : secondaryRegion.getStores()) { 714 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 715 } 716 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 717 MemStoreSize mss = store.getFlushableSize(); 718 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 719 720 // assert that the region memstore is smaller than before, but not empty 721 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 722 assertTrue(newRegionMemstoreSize > 0); 723 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 724 725 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 726 727 LOG.info("-- Verifying edits from secondary"); 728 verifyData(secondaryRegion, 0, numRows, cq, families); 729 730 LOG.info("-- Verifying edits from primary."); 731 verifyData(primaryRegion, 0, numRows, cq, families); 732 } 733 734 /** 735 * Tests the case where we receive a flush commit before receiving any flush prepare markers. 736 * The memstore edits should be dropped after the flush commit replay since they should be in 737 * flushed files 738 */ 739 @Test 740 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 741 throws IOException { 742 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 743 } 744 745 /** 746 * Tests the case where we receive a flush commit before receiving any flush prepare markers. 747 * The memstore edits should be not dropped after the flush commit replay since not every edit 748 * will be in flushed files (based on seqId) 749 */ 750 @Test 751 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 752 throws IOException { 753 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 754 } 755 756 /** 757 * Tests the case where we receive a flush commit before receiving any flush prepare markers 758 */ 759 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 760 throws IOException { 761 // load some data to primary and flush. 1 flushes and some more unflushed data. 762 // write more data after flush depending on whether droppableSnapshot 763 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 764 int numRows = droppableMemstore ? 100 : 200; 765 766 // now replay the edits and the flush marker 767 reader = createWALReaderForPrimary(); 768 769 LOG.info("-- Replaying edits and flush events in secondary"); 770 FlushDescriptor commitFlushDesc = null; 771 772 int lastReplayed = 0; 773 while (true) { 774 WAL.Entry entry = reader.next(); 775 if (entry == null) { 776 break; 777 } 778 FlushDescriptor flushDesc 779 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 780 if (flushDesc != null) { 781 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 782 // do not replay flush start marker 783 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 784 commitFlushDesc = flushDesc; // hold on to the flush commit marker 785 } 786 // after replay verify that everything is still visible 787 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 788 } else { 789 lastReplayed = replayEdit(secondaryRegion, entry); 790 } 791 } 792 793 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 794 // and some more data in memstores (rows 100-300) 795 verifyData(secondaryRegion, 0, numRows, cq, families); 796 797 // no store files in the region 798 int expectedStoreFileCount = 0; 799 for (HStore s : secondaryRegion.getStores()) { 800 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 801 } 802 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 803 804 // Test case 1: replay a flush commit marker without start flush marker 805 assertNull(secondaryRegion.getPrepareFlushResult()); 806 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 807 808 // ensure all files are visible in secondary 809 for (HStore store : secondaryRegion.getStores()) { 810 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 811 } 812 813 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 814 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 815 816 // assert that the flush files are picked 817 expectedStoreFileCount++; 818 for (HStore s : secondaryRegion.getStores()) { 819 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 820 } 821 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 822 MemStoreSize mss = store.getFlushableSize(); 823 if (droppableMemstore) { 824 // assert that the memstore is dropped 825 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 826 } else { 827 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 828 } 829 830 // assert that the region memstore is same as before (we could not drop) 831 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 832 if (droppableMemstore) { 833 assertTrue(0 == newRegionMemstoreSize); 834 } else { 835 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 836 } 837 838 LOG.info("-- Verifying edits from secondary"); 839 verifyData(secondaryRegion, 0, numRows, cq, families); 840 841 LOG.info("-- Verifying edits from primary."); 842 verifyData(primaryRegion, 0, numRows, cq, families); 843 } 844 845 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 846 return FlushDescriptor.newBuilder(flush) 847 .setFlushSequenceNumber(flushSeqId) 848 .build(); 849 } 850 851 /** 852 * Tests replaying region open markers from primary region. Checks whether the files are picked up 853 */ 854 @Test 855 public void testReplayRegionOpenEvent() throws IOException { 856 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 857 int numRows = 100; 858 859 // close the region and open again. 860 primaryRegion.close(); 861 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 862 863 // now replay the edits and the flush marker 864 reader = createWALReaderForPrimary(); 865 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 866 867 LOG.info("-- Replaying edits and region events in secondary"); 868 while (true) { 869 WAL.Entry entry = reader.next(); 870 if (entry == null) { 871 break; 872 } 873 FlushDescriptor flushDesc 874 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 875 RegionEventDescriptor regionEventDesc 876 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 877 878 if (flushDesc != null) { 879 // don't replay flush events 880 } else if (regionEventDesc != null) { 881 regionEvents.add(regionEventDesc); 882 } else { 883 // don't replay edits 884 } 885 } 886 887 // we should have 1 open, 1 close and 1 open event 888 assertEquals(3, regionEvents.size()); 889 890 // replay the first region open event. 891 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 892 893 // replay the close event as well 894 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 895 896 // no store files in the region 897 int expectedStoreFileCount = 0; 898 for (HStore s : secondaryRegion.getStores()) { 899 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 900 } 901 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 902 assertTrue(regionMemstoreSize == 0); 903 904 // now replay the region open event that should contain new file locations 905 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 906 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 907 908 // assert that the flush files are picked 909 expectedStoreFileCount++; 910 for (HStore s : secondaryRegion.getStores()) { 911 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 912 } 913 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 914 MemStoreSize mss = store.getFlushableSize(); 915 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 916 917 // assert that the region memstore is empty 918 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 919 assertTrue(newRegionMemstoreSize == 0); 920 921 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any 922 923 LOG.info("-- Verifying edits from secondary"); 924 verifyData(secondaryRegion, 0, numRows, cq, families); 925 926 LOG.info("-- Verifying edits from primary."); 927 verifyData(primaryRegion, 0, numRows, cq, families); 928 } 929 930 /** 931 * Tests the case where we replay a region open event after a flush start but before receiving 932 * flush commit 933 */ 934 @Test 935 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 936 putDataWithFlushes(primaryRegion, 100, 100, 100); 937 int numRows = 200; 938 939 // close the region and open again. 940 primaryRegion.close(); 941 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 942 943 // now replay the edits and the flush marker 944 reader = createWALReaderForPrimary(); 945 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 946 947 LOG.info("-- Replaying edits and region events in secondary"); 948 while (true) { 949 WAL.Entry entry = reader.next(); 950 if (entry == null) { 951 break; 952 } 953 FlushDescriptor flushDesc 954 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 955 RegionEventDescriptor regionEventDesc 956 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 957 958 if (flushDesc != null) { 959 // only replay flush start 960 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 961 secondaryRegion.replayWALFlushStartMarker(flushDesc); 962 } 963 } else if (regionEventDesc != null) { 964 regionEvents.add(regionEventDesc); 965 } else { 966 replayEdit(secondaryRegion, entry); 967 } 968 } 969 970 // at this point, there should be some data (rows 0-100) in the memstore snapshot 971 // and some more data in memstores (rows 100-200) 972 verifyData(secondaryRegion, 0, numRows, cq, families); 973 974 // we should have 1 open, 1 close and 1 open event 975 assertEquals(3, regionEvents.size()); 976 977 // no store files in the region 978 int expectedStoreFileCount = 0; 979 for (HStore s : secondaryRegion.getStores()) { 980 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 981 } 982 983 // now replay the region open event that should contain new file locations 984 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 985 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 986 987 // assert that the flush files are picked 988 expectedStoreFileCount = 2; // two flushes happened 989 for (HStore s : secondaryRegion.getStores()) { 990 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 991 } 992 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 993 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 994 assertTrue(newSnapshotSize.getDataSize() == 0); 995 996 // assert that the region memstore is empty 997 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 998 assertTrue(newRegionMemstoreSize == 0); 999 1000 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any 1001 1002 LOG.info("-- Verifying edits from secondary"); 1003 verifyData(secondaryRegion, 0, numRows, cq, families); 1004 1005 LOG.info("-- Verifying edits from primary."); 1006 verifyData(primaryRegion, 0, numRows, cq, families); 1007 } 1008 1009 /** 1010 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1011 * of the last replayed region open event. 1012 */ 1013 @Test 1014 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1015 putDataWithFlushes(primaryRegion, 100, 100, 0); 1016 int numRows = 100; 1017 1018 // close the region and open again. 1019 primaryRegion.close(); 1020 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1021 1022 // now replay the edits and the flush marker 1023 reader = createWALReaderForPrimary(); 1024 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1025 List<WAL.Entry> edits = Lists.newArrayList(); 1026 1027 LOG.info("-- Replaying edits and region events in secondary"); 1028 while (true) { 1029 WAL.Entry entry = reader.next(); 1030 if (entry == null) { 1031 break; 1032 } 1033 FlushDescriptor flushDesc 1034 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1035 RegionEventDescriptor regionEventDesc 1036 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1037 1038 if (flushDesc != null) { 1039 // don't replay flushes 1040 } else if (regionEventDesc != null) { 1041 regionEvents.add(regionEventDesc); 1042 } else { 1043 edits.add(entry); 1044 } 1045 } 1046 1047 // replay the region open of first open, but with the seqid of the second open 1048 // this way non of the flush files will be picked up. 1049 secondaryRegion.replayWALRegionEventMarker( 1050 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber( 1051 regionEvents.get(2).getLogSequenceNumber()).build()); 1052 1053 1054 // replay edits from the before region close. If replay does not 1055 // skip these the following verification will NOT fail. 1056 for (WAL.Entry entry: edits) { 1057 replayEdit(secondaryRegion, entry); 1058 } 1059 1060 boolean expectedFail = false; 1061 try { 1062 verifyData(secondaryRegion, 0, numRows, cq, families); 1063 } catch (AssertionError e) { 1064 expectedFail = true; // expected 1065 } 1066 if (!expectedFail) { 1067 fail("Should have failed this verification"); 1068 } 1069 } 1070 1071 @Test 1072 public void testReplayFlushSeqIds() throws IOException { 1073 // load some data to primary and flush 1074 int start = 0; 1075 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); 1076 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1077 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1078 primaryRegion.flush(true); 1079 1080 // now replay the flush marker 1081 reader = createWALReaderForPrimary(); 1082 1083 long flushSeqId = -1; 1084 LOG.info("-- Replaying flush events in secondary"); 1085 while (true) { 1086 WAL.Entry entry = reader.next(); 1087 if (entry == null) { 1088 break; 1089 } 1090 FlushDescriptor flushDesc 1091 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1092 if (flushDesc != null) { 1093 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1094 LOG.info("-- Replaying flush start in secondary"); 1095 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1096 flushSeqId = flushDesc.getFlushSequenceNumber(); 1097 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1098 LOG.info("-- Replaying flush commit in secondary"); 1099 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1100 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1101 } 1102 } 1103 // else do not replay 1104 } 1105 1106 // TODO: what to do with this? 1107 // assert that the newly picked up flush file is visible 1108 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1109 assertEquals(flushSeqId, readPoint); 1110 1111 // after replay verify that everything is still visible 1112 verifyData(secondaryRegion, 0, 100, cq, families); 1113 } 1114 1115 @Test 1116 public void testSeqIdsFromReplay() throws IOException { 1117 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1118 // original seqIds and they are made visible through mvcc read point upon replay 1119 String method = name.getMethodName(); 1120 byte[] tableName = Bytes.toBytes(method); 1121 byte[] family = Bytes.toBytes("family"); 1122 1123 HRegion region = initHRegion(tableName, method, family); 1124 try { 1125 // replay an entry that is bigger than current read point 1126 long readPoint = region.getMVCC().getReadPoint(); 1127 long origSeqId = readPoint + 100; 1128 1129 Put put = new Put(row).addColumn(family, row, row); 1130 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1131 replay(region, put, origSeqId); 1132 1133 // read point should have advanced to this seqId 1134 assertGet(region, family, row); 1135 1136 // region seqId should have advanced at least to this seqId 1137 assertEquals(origSeqId, region.getReadPoint(null)); 1138 1139 // replay an entry that is smaller than current read point 1140 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1141 // replay does not allow reads while replay is going on. 1142 put = new Put(row2).addColumn(family, row2, row2); 1143 put.setDurability(Durability.SKIP_WAL); 1144 replay(region, put, origSeqId - 50); 1145 1146 assertGet(region, family, row2); 1147 } finally { 1148 region.close(); 1149 } 1150 } 1151 1152 /** 1153 * Tests that a region opened in secondary mode would not write region open / close 1154 * events to its WAL. 1155 * @throws IOException 1156 */ 1157 @Test 1158 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1159 secondaryRegion.close(); 1160 walSecondary = spy(walSecondary); 1161 1162 // test for region open and close 1163 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1164 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1165 any(WALEdit.class)); 1166 1167 // test for replay prepare flush 1168 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1169 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder(). 1170 setFlushSequenceNumber(10) 1171 .setTableName(UnsafeByteOperations.unsafeWrap( 1172 primaryRegion.getTableDescriptor().getTableName().getName())) 1173 .setAction(FlushAction.START_FLUSH) 1174 .setEncodedRegionName( 1175 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1176 .setRegionName(UnsafeByteOperations.unsafeWrap( 1177 primaryRegion.getRegionInfo().getRegionName())) 1178 .build()); 1179 1180 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1181 any(WALEdit.class)); 1182 1183 secondaryRegion.close(); 1184 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1185 any(WALEdit.class)); 1186 } 1187 1188 /** 1189 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1190 */ 1191 @Test 1192 public void testRegionReadsEnabledFlag() throws IOException { 1193 1194 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1195 1196 verifyData(secondaryRegion, 0, 100, cq, families); 1197 1198 // now disable reads 1199 secondaryRegion.setReadsEnabled(false); 1200 try { 1201 verifyData(secondaryRegion, 0, 100, cq, families); 1202 fail("Should have failed with IOException"); 1203 } catch(IOException ex) { 1204 // expected 1205 } 1206 1207 // verify that we can still replay data 1208 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1209 1210 // now enable reads again 1211 secondaryRegion.setReadsEnabled(true); 1212 verifyData(secondaryRegion, 0, 200, cq, families); 1213 } 1214 1215 /** 1216 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1217 * It should write the flush request marker instead. 1218 */ 1219 @Test 1220 public void testWriteFlushRequestMarker() throws IOException { 1221 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1222 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1223 assertNotNull(result); 1224 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1225 assertFalse(result.wroteFlushWalMarker); 1226 1227 // request flush again, but this time with writeFlushRequestWalMarker = true 1228 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1229 assertNotNull(result); 1230 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1231 assertTrue(result.wroteFlushWalMarker); 1232 1233 List<FlushDescriptor> flushes = Lists.newArrayList(); 1234 reader = createWALReaderForPrimary(); 1235 while (true) { 1236 WAL.Entry entry = reader.next(); 1237 if (entry == null) { 1238 break; 1239 } 1240 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1241 if (flush != null) { 1242 flushes.add(flush); 1243 } 1244 } 1245 1246 assertEquals(1, flushes.size()); 1247 assertNotNull(flushes.get(0)); 1248 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1249 } 1250 1251 /** 1252 * Test the case where the secondary region replica is not in reads enabled state because it is 1253 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH 1254 * flush marker entry should restore the reads enabled status in the region and allow the reads 1255 * to continue. 1256 */ 1257 @Test 1258 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1259 disableReads(secondaryRegion); 1260 1261 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1262 // triggered flush restores readsEnabled 1263 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1264 reader = createWALReaderForPrimary(); 1265 while (true) { 1266 WAL.Entry entry = reader.next(); 1267 if (entry == null) { 1268 break; 1269 } 1270 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1271 if (flush != null) { 1272 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1273 } 1274 } 1275 1276 // now reads should be enabled 1277 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1278 } 1279 1280 /** 1281 * Test the case where the secondary region replica is not in reads enabled state because it is 1282 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1283 * entries should restore the reads enabled status in the region and allow the reads 1284 * to continue. 1285 */ 1286 @Test 1287 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1288 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1289 // from triggered flush restores readsEnabled 1290 disableReads(secondaryRegion); 1291 1292 // put some data in primary 1293 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1294 primaryRegion.flush(true); 1295 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1296 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1297 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1298 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1299 // but can't figure it... and this is only test that seems to suffer this flush issue. 1300 // St.Ack 20160201 1301 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1302 1303 reader = createWALReaderForPrimary(); 1304 while (true) { 1305 WAL.Entry entry = reader.next(); 1306 LOG.info(Objects.toString(entry)); 1307 if (entry == null) { 1308 break; 1309 } 1310 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1311 if (flush != null) { 1312 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1313 } else { 1314 replayEdit(secondaryRegion, entry); 1315 } 1316 } 1317 1318 // now reads should be enabled 1319 verifyData(secondaryRegion, 0, 100, cq, families); 1320 } 1321 1322 /** 1323 * Test the case where the secondary region replica is not in reads enabled state because it is 1324 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1325 * entries should restore the reads enabled status in the region and allow the reads 1326 * to continue. 1327 */ 1328 @Test 1329 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1330 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1331 // from triggered flush restores readsEnabled 1332 disableReads(secondaryRegion); 1333 1334 // put some data in primary 1335 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1336 primaryRegion.flush(true); 1337 1338 reader = createWALReaderForPrimary(); 1339 while (true) { 1340 WAL.Entry entry = reader.next(); 1341 if (entry == null) { 1342 break; 1343 } 1344 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1345 if (flush != null) { 1346 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1347 } 1348 } 1349 1350 // now reads should be enabled 1351 verifyData(secondaryRegion, 0, 100, cq, families); 1352 } 1353 1354 /** 1355 * Test the case where the secondary region replica is not in reads enabled state because it is 1356 * waiting for a flush or region open marker from primary region. Replaying region open event 1357 * entry from primary should restore the reads enabled status in the region and allow the reads 1358 * to continue. 1359 */ 1360 @Test 1361 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1362 // Test case 3: Test that replaying region open event markers restores readsEnabled 1363 disableReads(secondaryRegion); 1364 1365 primaryRegion.close(); 1366 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1367 1368 reader = createWALReaderForPrimary(); 1369 while (true) { 1370 WAL.Entry entry = reader.next(); 1371 if (entry == null) { 1372 break; 1373 } 1374 1375 RegionEventDescriptor regionEventDesc 1376 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1377 1378 if (regionEventDesc != null) { 1379 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1380 } 1381 } 1382 1383 // now reads should be enabled 1384 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1385 } 1386 1387 @Test 1388 public void testRefresStoreFiles() throws IOException { 1389 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1390 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1391 1392 // Test case 1: refresh with an empty region 1393 secondaryRegion.refreshStoreFiles(); 1394 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1395 1396 // do one flush 1397 putDataWithFlushes(primaryRegion, 100, 100, 0); 1398 int numRows = 100; 1399 1400 // refresh the store file list, and ensure that the files are picked up. 1401 secondaryRegion.refreshStoreFiles(); 1402 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1403 secondaryRegion.getStoreFileList(families)); 1404 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1405 1406 LOG.info("-- Verifying edits from secondary"); 1407 verifyData(secondaryRegion, 0, numRows, cq, families); 1408 1409 // Test case 2: 3 some more flushes 1410 putDataWithFlushes(primaryRegion, 100, 300, 0); 1411 numRows = 300; 1412 1413 // refresh the store file list, and ensure that the files are picked up. 1414 secondaryRegion.refreshStoreFiles(); 1415 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1416 secondaryRegion.getStoreFileList(families)); 1417 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1418 1419 LOG.info("-- Verifying edits from secondary"); 1420 verifyData(secondaryRegion, 0, numRows, cq, families); 1421 1422 if (FSUtils.WINDOWS) { 1423 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1424 return; 1425 } 1426 1427 // Test case 3: compact primary files 1428 primaryRegion.compactStores(); 1429 List<HRegion> regions = new ArrayList<>(); 1430 regions.add(primaryRegion); 1431 Mockito.doReturn(regions).when(rss).getRegions(); 1432 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1433 cleaner.chore(); 1434 secondaryRegion.refreshStoreFiles(); 1435 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1436 secondaryRegion.getStoreFileList(families)); 1437 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1438 1439 LOG.info("-- Verifying edits from secondary"); 1440 verifyData(secondaryRegion, 0, numRows, cq, families); 1441 1442 LOG.info("-- Replaying edits in secondary"); 1443 1444 // Test case 4: replay some edits, ensure that memstore is dropped. 1445 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1446 putDataWithFlushes(primaryRegion, 400, 400, 0); 1447 numRows = 400; 1448 1449 reader = createWALReaderForPrimary(); 1450 while (true) { 1451 WAL.Entry entry = reader.next(); 1452 if (entry == null) { 1453 break; 1454 } 1455 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1456 if (flush != null) { 1457 // do not replay flush 1458 } else { 1459 replayEdit(secondaryRegion, entry); 1460 } 1461 } 1462 1463 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1464 1465 secondaryRegion.refreshStoreFiles(); 1466 1467 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1468 1469 LOG.info("-- Verifying edits from primary"); 1470 verifyData(primaryRegion, 0, numRows, cq, families); 1471 LOG.info("-- Verifying edits from secondary"); 1472 verifyData(secondaryRegion, 0, numRows, cq, families); 1473 } 1474 1475 /** 1476 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1477 */ 1478 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1479 List<Path> l1 = new ArrayList<>(list1.size()); 1480 for (String path : list1) { 1481 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1482 } 1483 List<Path> l2 = new ArrayList<>(list2.size()); 1484 for (String path : list2) { 1485 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1486 } 1487 assertEquals(l1, l2); 1488 } 1489 1490 private void disableReads(HRegion region) { 1491 region.setReadsEnabled(false); 1492 try { 1493 verifyData(region, 0, 1, cq, families); 1494 fail("Should have failed with IOException"); 1495 } catch(IOException ex) { 1496 // expected 1497 } 1498 } 1499 1500 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1501 put.setDurability(Durability.SKIP_WAL); 1502 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1503 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); 1504 } 1505 1506 /** 1507 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1508 */ 1509 @Test 1510 public void testReplayBulkLoadEvent() throws IOException { 1511 LOG.info("testReplayBulkLoadEvent starts"); 1512 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1513 1514 // close the region and open again. 1515 primaryRegion.close(); 1516 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1517 1518 // bulk load a file into primary region 1519 Random random = new Random(); 1520 byte[] randomValues = new byte[20]; 1521 random.nextBytes(randomValues); 1522 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1523 1524 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1525 int expectedLoadFileCount = 0; 1526 for (byte[] family : families) { 1527 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1528 expectedLoadFileCount++; 1529 } 1530 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1531 1532 // now replay the edits and the bulk load marker 1533 reader = createWALReaderForPrimary(); 1534 1535 LOG.info("-- Replaying edits and region events in secondary"); 1536 BulkLoadDescriptor bulkloadEvent = null; 1537 while (true) { 1538 WAL.Entry entry = reader.next(); 1539 if (entry == null) { 1540 break; 1541 } 1542 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1543 if (bulkloadEvent != null) { 1544 break; 1545 } 1546 } 1547 1548 // we should have 1 bulk load event 1549 assertTrue(bulkloadEvent != null); 1550 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1551 1552 // replay the bulk load event 1553 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1554 1555 1556 List<String> storeFileName = new ArrayList<>(); 1557 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1558 storeFileName.addAll(storeDesc.getStoreFileList()); 1559 } 1560 // assert that the bulk loaded files are picked 1561 for (HStore s : secondaryRegion.getStores()) { 1562 for (HStoreFile sf : s.getStorefiles()) { 1563 storeFileName.remove(sf.getPath().getName()); 1564 } 1565 } 1566 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); 1567 1568 LOG.info("-- Verifying edits from secondary"); 1569 for (byte[] family : families) { 1570 assertGet(secondaryRegion, family, randomValues); 1571 } 1572 } 1573 1574 @Test 1575 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1576 // tests replaying flush commit marker, but the flush file has already been compacted 1577 // from primary and also deleted from the archive directory 1578 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder(). 1579 setFlushSequenceNumber(Long.MAX_VALUE) 1580 .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1581 .setAction(FlushAction.COMMIT_FLUSH) 1582 .setEncodedRegionName( 1583 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1584 .setRegionName(UnsafeByteOperations.unsafeWrap( 1585 primaryRegion.getRegionInfo().getRegionName())) 1586 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1587 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1588 .setStoreHomeDir("/store_home_dir") 1589 .addFlushOutput("/foo/baz/123") 1590 .build()) 1591 .build()); 1592 } 1593 1594 @Test 1595 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1596 // tests replaying compaction marker, but the compaction output file has already been compacted 1597 // from primary and also deleted from the archive directory 1598 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder() 1599 .setTableName(UnsafeByteOperations.unsafeWrap( 1600 primaryRegion.getTableDescriptor().getTableName().getName())) 1601 .setEncodedRegionName( 1602 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1603 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1604 .addCompactionInput("/123") 1605 .addCompactionOutput("/456") 1606 .setStoreHomeDir("/store_home_dir") 1607 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1608 .build() 1609 , true, true, Long.MAX_VALUE); 1610 } 1611 1612 @Test 1613 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1614 // tests replaying region open event marker, but the region files have already been compacted 1615 // from primary and also deleted from the archive directory 1616 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1617 .setTableName(UnsafeByteOperations.unsafeWrap( 1618 primaryRegion.getTableDescriptor().getTableName().getName())) 1619 .setEncodedRegionName( 1620 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1621 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1622 .setEventType(EventType.REGION_OPEN) 1623 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1624 .setLogSequenceNumber(Long.MAX_VALUE) 1625 .addStores(StoreDescriptor.newBuilder() 1626 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1627 .setStoreHomeDir("/store_home_dir") 1628 .addStoreFile("/123") 1629 .build()) 1630 .build()); 1631 } 1632 1633 @Test 1634 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1635 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1636 // from primary and also deleted from the archive directory 1637 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1638 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1639 .setEncodedRegionName( 1640 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1641 .setBulkloadSeqNum(Long.MAX_VALUE) 1642 .addStores(StoreDescriptor.newBuilder() 1643 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1644 .setStoreHomeDir("/store_home_dir") 1645 .addStoreFile("/123") 1646 .build()) 1647 .build()); 1648 } 1649 1650 private String createHFileForFamilies(Path testPath, byte[] family, 1651 byte[] valueBytes) throws IOException { 1652 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1653 // TODO We need a way to do this without creating files 1654 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); 1655 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1656 try { 1657 hFileFactory.withOutputStream(out); 1658 hFileFactory.withFileContext(new HFileContext()); 1659 HFile.Writer writer = hFileFactory.create(); 1660 try { 1661 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L, 1662 KeyValue.Type.Put.getCode(), valueBytes))); 1663 } finally { 1664 writer.close(); 1665 } 1666 } finally { 1667 out.close(); 1668 } 1669 return testFile.toString(); 1670 } 1671 1672 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does 1673 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of 1674 * more rows but does not execute flush after 1675 * @throws IOException */ 1676 private void putDataWithFlushes(HRegion region, int flushInterval, 1677 int numRows, int numRowsAfterFlush) throws IOException { 1678 int start = 0; 1679 for (; start < numRows; start += flushInterval) { 1680 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval)); 1681 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1682 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1683 region.flush(true); 1684 } 1685 LOG.info("-- Writing some more data to primary, not flushing"); 1686 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1687 } 1688 1689 private void putDataByReplay(HRegion region, 1690 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 1691 for (int i = startRow; i < startRow + numRows; i++) { 1692 Put put = new Put(Bytes.toBytes("" + i)); 1693 put.setDurability(Durability.SKIP_WAL); 1694 for (byte[] family : families) { 1695 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1696 } 1697 replay(region, put, i+1); 1698 } 1699 } 1700 1701 private static HRegion initHRegion(byte[] tableName, 1702 String callingMethod, byte[]... families) throws IOException { 1703 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1704 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families); 1705 } 1706 1707 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, 1708 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, 1709 WAL wal, byte[]... families) throws IOException { 1710 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, 1711 isReadOnly, durability, wal, families); 1712 } 1713}