001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.FileNotFoundException; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.Map; 041import java.util.Objects; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.CellUtil; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HBaseTestingUtility; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.KeyValue; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 054import org.apache.hadoop.hbase.client.Durability; 055import org.apache.hadoop.hbase.client.Get; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionInfo; 058import org.apache.hadoop.hbase.client.RegionInfoBuilder; 059import org.apache.hadoop.hbase.client.TableDescriptor; 060import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 061import org.apache.hadoop.hbase.executor.ExecutorService; 062import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; 063import org.apache.hadoop.hbase.executor.ExecutorType; 064import org.apache.hadoop.hbase.io.hfile.HFile; 065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 066import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 067import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 068import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 069import org.apache.hadoop.hbase.testclassification.LargeTests; 070import org.apache.hadoop.hbase.util.Bytes; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 073import org.apache.hadoop.hbase.util.FSUtils; 074import org.apache.hadoop.hbase.util.Pair; 075import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 076import org.apache.hadoop.hbase.wal.WAL; 077import org.apache.hadoop.hbase.wal.WALEdit; 078import org.apache.hadoop.hbase.wal.WALFactory; 079import org.apache.hadoop.hbase.wal.WALKeyImpl; 080import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 081import org.apache.hadoop.util.StringUtils; 082import org.junit.After; 083import org.junit.AfterClass; 084import org.junit.Before; 085import org.junit.BeforeClass; 086import org.junit.ClassRule; 087import org.junit.Rule; 088import org.junit.Test; 089import org.junit.experimental.categories.Category; 090import org.junit.rules.TestName; 091import org.mockito.Mockito; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 096import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 097 098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 108 109/** 110 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 111 * region replicas 112 */ 113@Category(LargeTests.class) 114public class TestHRegionReplayEvents { 115 116 @ClassRule 117 public static final HBaseClassTestRule CLASS_RULE = 118 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); 119 120 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 121 @Rule 122 public TestName name = new TestName(); 123 124 private static HBaseTestingUtility TEST_UTIL; 125 126 public static Configuration CONF; 127 private String dir; 128 129 private byte[][] families = 130 new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") }; 131 132 // Test names 133 protected byte[] tableName; 134 protected String method; 135 protected final byte[] row = Bytes.toBytes("rowA"); 136 protected final byte[] row2 = Bytes.toBytes("rowB"); 137 protected byte[] cq = Bytes.toBytes("cq"); 138 139 // per test fields 140 private Path rootDir; 141 private TableDescriptor htd; 142 private RegionServerServices rss; 143 private RegionInfo primaryHri, secondaryHri; 144 private HRegion primaryRegion, secondaryRegion; 145 private WAL walPrimary, walSecondary; 146 private WAL.Reader reader; 147 148 @BeforeClass 149 public static void setUpBeforeClass() throws Exception { 150 TEST_UTIL = new HBaseTestingUtility(); 151 TEST_UTIL.startMiniDFSCluster(1); 152 } 153 154 @AfterClass 155 public static void tearDownAfterClass() throws Exception { 156 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 157 TEST_UTIL.cleanupTestDir(); 158 TEST_UTIL.shutdownMiniDFSCluster(); 159 } 160 161 @Before 162 public void setUp() throws Exception { 163 CONF = TEST_UTIL.getConfiguration(); 164 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 165 method = name.getMethodName(); 166 tableName = Bytes.toBytes(name.getMethodName()); 167 rootDir = new Path(dir + method); 168 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 169 method = name.getMethodName(); 170 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 171 for (byte[] family : families) { 172 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 173 } 174 htd = builder.build(); 175 176 long time = EnvironmentEdgeManager.currentTime(); 177 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 178 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 179 primaryHri = 180 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 181 secondaryHri = 182 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 183 184 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); 185 walPrimary = wals.getWAL(primaryHri); 186 walSecondary = wals.getWAL(secondaryHri); 187 188 rss = mock(RegionServerServices.class); 189 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 190 when(rss.getConfiguration()).thenReturn(CONF); 191 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 192 String string = 193 org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString(); 194 ExecutorService es = new ExecutorService(string); 195 es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1) 196 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); 197 when(rss.getExecutorService()).thenReturn(es); 198 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 199 primaryRegion.close(); 200 List<HRegion> regions = new ArrayList<>(); 201 regions.add(primaryRegion); 202 Mockito.doReturn(regions).when(rss).getRegions(); 203 204 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 205 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 206 207 reader = null; 208 } 209 210 @After 211 public void tearDown() throws Exception { 212 if (reader != null) { 213 reader.close(); 214 } 215 216 if (primaryRegion != null) { 217 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 218 } 219 if (secondaryRegion != null) { 220 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 221 } 222 223 EnvironmentEdgeManagerTestHelper.reset(); 224 } 225 226 String getName() { 227 return name.getMethodName(); 228 } 229 230 // Some of the test cases are as follows: 231 // 1. replay flush start marker again 232 // 2. replay flush with smaller seqId than what is there in memstore snapshot 233 // 3. replay flush with larger seqId than what is there in memstore snapshot 234 // 4. replay flush commit without flush prepare. non droppable memstore 235 // 5. replay flush commit without flush prepare. droppable memstore 236 // 6. replay open region event 237 // 7. replay open region event after flush start 238 // 8. replay flush form an earlier seqId (test ignoring seqIds) 239 // 9. start flush does not prevent region from closing. 240 241 @Test 242 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 243 // load some data and flush ensure that the secondary replica will not execute the flush 244 245 // load some data to secondary by replaying 246 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 247 248 verifyData(secondaryRegion, 0, 1000, cq, families); 249 250 // flush region 251 FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true); 252 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 253 254 verifyData(secondaryRegion, 0, 1000, cq, families); 255 256 // close the region, and inspect that it has not flushed 257 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 258 // assert that there are no files (due to flush) 259 for (List<HStoreFile> f : files.values()) { 260 assertTrue(f.isEmpty()); 261 } 262 } 263 264 /** 265 * Tests a case where we replay only a flush start marker, then the region is closed. This region 266 * should not block indefinitely 267 */ 268 @Test 269 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 270 // load some data to primary and flush 271 int start = 0; 272 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 273 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 274 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 275 primaryRegion.flush(true); 276 277 // now replay the edits and the flush marker 278 reader = createWALReaderForPrimary(); 279 280 LOG.info("-- Replaying edits and flush events in secondary"); 281 while (true) { 282 WAL.Entry entry = reader.next(); 283 if (entry == null) { 284 break; 285 } 286 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 287 if (flushDesc != null) { 288 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 289 LOG.info("-- Replaying flush start in secondary"); 290 secondaryRegion.replayWALFlushStartMarker(flushDesc); 291 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 292 LOG.info("-- NOT Replaying flush commit in secondary"); 293 } 294 } else { 295 replayEdit(secondaryRegion, entry); 296 } 297 } 298 299 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 300 // now close the region which should not cause hold because of un-committed flush 301 secondaryRegion.close(); 302 303 // verify that the memstore size is back to what it was 304 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 305 } 306 307 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 308 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 309 return 0; // handled elsewhere 310 } 311 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 312 for (Cell cell : entry.getEdit().getCells()) 313 put.add(cell); 314 put.setDurability(Durability.SKIP_WAL); 315 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 316 region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId()); 317 return Integer.parseInt(Bytes.toString(put.getRow())); 318 } 319 320 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { 321 return WALFactory.createReader(TEST_UTIL.getTestFileSystem(), 322 AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration()); 323 } 324 325 @Test 326 public void testBatchReplayWithMultipleNonces() throws IOException { 327 try { 328 MutationReplay[] mutations = new MutationReplay[100]; 329 for (int i = 0; i < 100; i++) { 330 Put put = new Put(Bytes.toBytes(i)); 331 put.setDurability(Durability.SYNC_WAL); 332 for (byte[] familly : this.families) { 333 put.addColumn(familly, this.cq, null); 334 long nonceNum = i / 10; 335 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 336 } 337 } 338 primaryRegion.batchReplay(mutations, 20); 339 } catch (Exception e) { 340 String msg = "Error while replay of batch with multiple nonces. "; 341 LOG.error(msg, e); 342 fail(msg + e.getMessage()); 343 } 344 } 345 346 @Test 347 public void testReplayFlushesAndCompactions() throws IOException { 348 // initiate a secondary region with some data. 349 350 // load some data to primary and flush. 3 flushes and some more unflushed data 351 putDataWithFlushes(primaryRegion, 100, 300, 100); 352 353 // compaction from primary 354 LOG.info("-- Compacting primary, only 1 store"); 355 primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE); 356 357 // now replay the edits and the flush marker 358 reader = createWALReaderForPrimary(); 359 360 LOG.info("-- Replaying edits and flush events in secondary"); 361 int lastReplayed = 0; 362 int expectedStoreFileCount = 0; 363 while (true) { 364 WAL.Entry entry = reader.next(); 365 if (entry == null) { 366 break; 367 } 368 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 369 CompactionDescriptor compactionDesc = 370 WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 371 if (flushDesc != null) { 372 // first verify that everything is replayed and visible before flush event replay 373 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 374 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 375 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 376 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 377 MemStoreSize mss = store.getFlushableSize(); 378 long storeSize = store.getSize(); 379 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 380 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 381 LOG.info("-- Replaying flush start in secondary"); 382 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 383 assertNull(result.result); 384 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 385 386 // assert that the store memstore is smaller now 387 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 388 LOG.info("Memstore size reduced by:" 389 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 390 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 391 392 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 393 LOG.info("-- Replaying flush commit in secondary"); 394 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 395 396 // assert that the flush files are picked 397 expectedStoreFileCount++; 398 for (HStore s : secondaryRegion.getStores()) { 399 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 400 } 401 MemStoreSize newMss = store.getFlushableSize(); 402 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 403 404 // assert that the region memstore is smaller now 405 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 406 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 407 408 // assert that the store sizes are bigger 409 assertTrue(store.getSize() > storeSize); 410 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 411 assertEquals(store.getSize(), store.getStorefilesSize()); 412 } 413 // after replay verify that everything is still visible 414 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 415 } else if (compactionDesc != null) { 416 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 417 418 // assert that the compaction is applied 419 for (HStore store : secondaryRegion.getStores()) { 420 if (store.getColumnFamilyName().equals("cf1")) { 421 assertEquals(1, store.getStorefilesCount()); 422 } else { 423 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 424 } 425 } 426 } else { 427 lastReplayed = replayEdit(secondaryRegion, entry); 428 ; 429 } 430 } 431 432 assertEquals(400 - 1, lastReplayed); 433 LOG.info("-- Verifying edits from secondary"); 434 verifyData(secondaryRegion, 0, 400, cq, families); 435 436 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 437 verifyData(primaryRegion, 0, lastReplayed, cq, families); 438 for (HStore store : primaryRegion.getStores()) { 439 if (store.getColumnFamilyName().equals("cf1")) { 440 assertEquals(1, store.getStorefilesCount()); 441 } else { 442 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 443 } 444 } 445 } 446 447 /** 448 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 449 * equal to, greater or less than the previous flush start marker. 450 */ 451 @Test 452 public void testReplayFlushStartMarkers() throws IOException { 453 // load some data to primary and flush. 1 flush and some more unflushed data 454 putDataWithFlushes(primaryRegion, 100, 100, 100); 455 int numRows = 200; 456 457 // now replay the edits and the flush marker 458 reader = createWALReaderForPrimary(); 459 460 LOG.info("-- Replaying edits and flush events in secondary"); 461 462 FlushDescriptor startFlushDesc = null; 463 464 int lastReplayed = 0; 465 while (true) { 466 WAL.Entry entry = reader.next(); 467 if (entry == null) { 468 break; 469 } 470 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 471 if (flushDesc != null) { 472 // first verify that everything is replayed and visible before flush event replay 473 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 474 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 475 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 476 MemStoreSize mss = store.getFlushableSize(); 477 478 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 479 startFlushDesc = flushDesc; 480 LOG.info("-- Replaying flush start in secondary"); 481 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 482 assertNull(result.result); 483 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 484 assertTrue(regionMemstoreSize > 0); 485 assertTrue(mss.getHeapSize() > 0); 486 487 // assert that the store memstore is smaller now 488 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 489 LOG.info("Memstore size reduced by:" 490 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 491 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 492 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 493 494 } 495 // after replay verify that everything is still visible 496 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 497 } else { 498 lastReplayed = replayEdit(secondaryRegion, entry); 499 } 500 } 501 502 // at this point, there should be some data (rows 0-100) in memstore snapshot 503 // and some more data in memstores (rows 100-200) 504 505 verifyData(secondaryRegion, 0, numRows, cq, families); 506 507 // Test case 1: replay the same flush start marker again 508 LOG.info("-- Replaying same flush start in secondary again"); 509 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 510 assertNull(result); // this should return null. Ignoring the flush start marker 511 // assert that we still have prepared flush with the previous setup. 512 assertNotNull(secondaryRegion.getPrepareFlushResult()); 513 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 514 startFlushDesc.getFlushSequenceNumber()); 515 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 516 verifyData(secondaryRegion, 0, numRows, cq, families); 517 518 // Test case 2: replay a flush start marker with a smaller seqId 519 FlushDescriptor startFlushDescSmallerSeqId = 520 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 521 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 522 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 523 assertNull(result); // this should return null. Ignoring the flush start marker 524 // assert that we still have prepared flush with the previous setup. 525 assertNotNull(secondaryRegion.getPrepareFlushResult()); 526 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 527 startFlushDesc.getFlushSequenceNumber()); 528 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 529 verifyData(secondaryRegion, 0, numRows, cq, families); 530 531 // Test case 3: replay a flush start marker with a larger seqId 532 FlushDescriptor startFlushDescLargerSeqId = 533 clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 534 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 535 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 536 assertNull(result); // this should return null. Ignoring the flush start marker 537 // assert that we still have prepared flush with the previous setup. 538 assertNotNull(secondaryRegion.getPrepareFlushResult()); 539 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 540 startFlushDesc.getFlushSequenceNumber()); 541 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 542 verifyData(secondaryRegion, 0, numRows, cq, families); 543 544 LOG.info("-- Verifying edits from secondary"); 545 verifyData(secondaryRegion, 0, numRows, cq, families); 546 547 LOG.info("-- Verifying edits from primary."); 548 verifyData(primaryRegion, 0, numRows, cq, families); 549 } 550 551 /** 552 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 553 * less than the previous flush start marker. 554 */ 555 @Test 556 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 557 // load some data to primary and flush. 2 flushes and some more unflushed data 558 putDataWithFlushes(primaryRegion, 100, 200, 100); 559 int numRows = 300; 560 561 // now replay the edits and the flush marker 562 reader = createWALReaderForPrimary(); 563 564 LOG.info("-- Replaying edits and flush events in secondary"); 565 FlushDescriptor startFlushDesc = null; 566 FlushDescriptor commitFlushDesc = null; 567 568 int lastReplayed = 0; 569 while (true) { 570 System.out.println(lastReplayed); 571 WAL.Entry entry = reader.next(); 572 if (entry == null) { 573 break; 574 } 575 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 576 if (flushDesc != null) { 577 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 578 // don't replay the first flush start marker, hold on to it, replay the second one 579 if (startFlushDesc == null) { 580 startFlushDesc = flushDesc; 581 } else { 582 LOG.info("-- Replaying flush start in secondary"); 583 startFlushDesc = flushDesc; 584 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 585 assertNull(result.result); 586 } 587 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 588 // do not replay any flush commit yet 589 if (commitFlushDesc == null) { 590 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 591 } 592 } 593 // after replay verify that everything is still visible 594 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 595 } else { 596 lastReplayed = replayEdit(secondaryRegion, entry); 597 } 598 } 599 600 // at this point, there should be some data (rows 0-200) in memstore snapshot 601 // and some more data in memstores (rows 200-300) 602 verifyData(secondaryRegion, 0, numRows, cq, families); 603 604 // no store files in the region 605 int expectedStoreFileCount = 0; 606 for (HStore s : secondaryRegion.getStores()) { 607 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 608 } 609 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 610 611 // Test case 1: replay the a flush commit marker smaller than what we have prepared 612 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 613 + startFlushDesc); 614 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 615 616 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 617 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 618 619 // assert that the flush files are picked 620 expectedStoreFileCount++; 621 for (HStore s : secondaryRegion.getStores()) { 622 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 623 } 624 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 625 MemStoreSize mss = store.getFlushableSize(); 626 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 627 628 // assert that the region memstore is same as before 629 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 630 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 631 632 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 633 634 LOG.info("-- Verifying edits from secondary"); 635 verifyData(secondaryRegion, 0, numRows, cq, families); 636 637 LOG.info("-- Verifying edits from primary."); 638 verifyData(primaryRegion, 0, numRows, cq, families); 639 } 640 641 /** 642 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 643 * larger than the previous flush start marker. 644 */ 645 @Test 646 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 647 // load some data to primary and flush. 1 flush and some more unflushed data 648 putDataWithFlushes(primaryRegion, 100, 100, 100); 649 int numRows = 200; 650 651 // now replay the edits and the flush marker 652 reader = createWALReaderForPrimary(); 653 654 LOG.info("-- Replaying edits and flush events in secondary"); 655 FlushDescriptor startFlushDesc = null; 656 FlushDescriptor commitFlushDesc = null; 657 658 int lastReplayed = 0; 659 while (true) { 660 WAL.Entry entry = reader.next(); 661 if (entry == null) { 662 break; 663 } 664 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 665 if (flushDesc != null) { 666 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 667 if (startFlushDesc == null) { 668 LOG.info("-- Replaying flush start in secondary"); 669 startFlushDesc = flushDesc; 670 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 671 assertNull(result.result); 672 } 673 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 674 // do not replay any flush commit yet 675 // hold on to the flush commit marker but simulate a larger 676 // flush commit seqId 677 commitFlushDesc = FlushDescriptor.newBuilder(flushDesc) 678 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build(); 679 } 680 // after replay verify that everything is still visible 681 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 682 } else { 683 lastReplayed = replayEdit(secondaryRegion, entry); 684 } 685 } 686 687 // at this point, there should be some data (rows 0-100) in memstore snapshot 688 // and some more data in memstores (rows 100-200) 689 verifyData(secondaryRegion, 0, numRows, cq, families); 690 691 // no store files in the region 692 int expectedStoreFileCount = 0; 693 for (HStore s : secondaryRegion.getStores()) { 694 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 695 } 696 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 697 698 // Test case 1: replay the a flush commit marker larger than what we have prepared 699 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 700 + startFlushDesc); 701 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 702 703 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 704 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 705 706 // assert that the flush files are picked 707 expectedStoreFileCount++; 708 for (HStore s : secondaryRegion.getStores()) { 709 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 710 } 711 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 712 MemStoreSize mss = store.getFlushableSize(); 713 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 714 715 // assert that the region memstore is smaller than before, but not empty 716 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 717 assertTrue(newRegionMemstoreSize > 0); 718 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 719 720 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 721 722 LOG.info("-- Verifying edits from secondary"); 723 verifyData(secondaryRegion, 0, numRows, cq, families); 724 725 LOG.info("-- Verifying edits from primary."); 726 verifyData(primaryRegion, 0, numRows, cq, families); 727 } 728 729 /** 730 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 731 * memstore edits should be dropped after the flush commit replay since they should be in flushed 732 * files 733 */ 734 @Test 735 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 736 throws IOException { 737 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 738 } 739 740 /** 741 * Tests the case where we receive a flush commit before receiving any flush prepare markers. The 742 * memstore edits should be not dropped after the flush commit replay since not every edit will be 743 * in flushed files (based on seqId) 744 */ 745 @Test 746 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 747 throws IOException { 748 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 749 } 750 751 /** 752 * Tests the case where we receive a flush commit before receiving any flush prepare markers 753 */ 754 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 755 throws IOException { 756 // load some data to primary and flush. 1 flushes and some more unflushed data. 757 // write more data after flush depending on whether droppableSnapshot 758 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 759 int numRows = droppableMemstore ? 100 : 200; 760 761 // now replay the edits and the flush marker 762 reader = createWALReaderForPrimary(); 763 764 LOG.info("-- Replaying edits and flush events in secondary"); 765 FlushDescriptor commitFlushDesc = null; 766 767 int lastReplayed = 0; 768 while (true) { 769 WAL.Entry entry = reader.next(); 770 if (entry == null) { 771 break; 772 } 773 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 774 if (flushDesc != null) { 775 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 776 // do not replay flush start marker 777 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 778 commitFlushDesc = flushDesc; // hold on to the flush commit marker 779 } 780 // after replay verify that everything is still visible 781 verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families); 782 } else { 783 lastReplayed = replayEdit(secondaryRegion, entry); 784 } 785 } 786 787 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 788 // and some more data in memstores (rows 100-300) 789 verifyData(secondaryRegion, 0, numRows, cq, families); 790 791 // no store files in the region 792 int expectedStoreFileCount = 0; 793 for (HStore s : secondaryRegion.getStores()) { 794 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 795 } 796 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 797 798 // Test case 1: replay a flush commit marker without start flush marker 799 assertNull(secondaryRegion.getPrepareFlushResult()); 800 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 801 802 // ensure all files are visible in secondary 803 for (HStore store : secondaryRegion.getStores()) { 804 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 805 } 806 807 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 808 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 809 810 // assert that the flush files are picked 811 expectedStoreFileCount++; 812 for (HStore s : secondaryRegion.getStores()) { 813 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 814 } 815 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 816 MemStoreSize mss = store.getFlushableSize(); 817 if (droppableMemstore) { 818 // assert that the memstore is dropped 819 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 820 } else { 821 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 822 } 823 824 // assert that the region memstore is same as before (we could not drop) 825 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 826 if (droppableMemstore) { 827 assertTrue(0 == newRegionMemstoreSize); 828 } else { 829 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 830 } 831 832 LOG.info("-- Verifying edits from secondary"); 833 verifyData(secondaryRegion, 0, numRows, cq, families); 834 835 LOG.info("-- Verifying edits from primary."); 836 verifyData(primaryRegion, 0, numRows, cq, families); 837 } 838 839 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 840 return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build(); 841 } 842 843 /** 844 * Tests replaying region open markers from primary region. Checks whether the files are picked up 845 */ 846 @Test 847 public void testReplayRegionOpenEvent() throws IOException { 848 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 849 int numRows = 100; 850 851 // close the region and open again. 852 primaryRegion.close(); 853 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 854 855 // now replay the edits and the flush marker 856 reader = createWALReaderForPrimary(); 857 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 858 859 LOG.info("-- Replaying edits and region events in secondary"); 860 while (true) { 861 WAL.Entry entry = reader.next(); 862 if (entry == null) { 863 break; 864 } 865 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 866 RegionEventDescriptor regionEventDesc = 867 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 868 869 if (flushDesc != null) { 870 // don't replay flush events 871 } else if (regionEventDesc != null) { 872 regionEvents.add(regionEventDesc); 873 } else { 874 // don't replay edits 875 } 876 } 877 878 // we should have 1 open, 1 close and 1 open event 879 assertEquals(3, regionEvents.size()); 880 881 // replay the first region open event. 882 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 883 884 // replay the close event as well 885 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 886 887 // no store files in the region 888 int expectedStoreFileCount = 0; 889 for (HStore s : secondaryRegion.getStores()) { 890 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 891 } 892 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 893 assertTrue(regionMemstoreSize == 0); 894 895 // now replay the region open event that should contain new file locations 896 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 897 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 898 899 // assert that the flush files are picked 900 expectedStoreFileCount++; 901 for (HStore s : secondaryRegion.getStores()) { 902 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 903 } 904 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 905 MemStoreSize mss = store.getFlushableSize(); 906 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 907 908 // assert that the region memstore is empty 909 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 910 assertTrue(newRegionMemstoreSize == 0); 911 912 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 913 // any 914 915 LOG.info("-- Verifying edits from secondary"); 916 verifyData(secondaryRegion, 0, numRows, cq, families); 917 918 LOG.info("-- Verifying edits from primary."); 919 verifyData(primaryRegion, 0, numRows, cq, families); 920 } 921 922 /** 923 * Tests the case where we replay a region open event after a flush start but before receiving 924 * flush commit 925 */ 926 @Test 927 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 928 putDataWithFlushes(primaryRegion, 100, 100, 100); 929 int numRows = 200; 930 931 // close the region and open again. 932 primaryRegion.close(); 933 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 934 935 // now replay the edits and the flush marker 936 reader = createWALReaderForPrimary(); 937 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 938 939 LOG.info("-- Replaying edits and region events in secondary"); 940 while (true) { 941 WAL.Entry entry = reader.next(); 942 if (entry == null) { 943 break; 944 } 945 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 946 RegionEventDescriptor regionEventDesc = 947 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 948 949 if (flushDesc != null) { 950 // only replay flush start 951 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 952 secondaryRegion.replayWALFlushStartMarker(flushDesc); 953 } 954 } else if (regionEventDesc != null) { 955 regionEvents.add(regionEventDesc); 956 } else { 957 replayEdit(secondaryRegion, entry); 958 } 959 } 960 961 // at this point, there should be some data (rows 0-100) in the memstore snapshot 962 // and some more data in memstores (rows 100-200) 963 verifyData(secondaryRegion, 0, numRows, cq, families); 964 965 // we should have 1 open, 1 close and 1 open event 966 assertEquals(3, regionEvents.size()); 967 968 // no store files in the region 969 int expectedStoreFileCount = 0; 970 for (HStore s : secondaryRegion.getStores()) { 971 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 972 } 973 974 // now replay the region open event that should contain new file locations 975 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 976 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 977 978 // assert that the flush files are picked 979 expectedStoreFileCount = 2; // two flushes happened 980 for (HStore s : secondaryRegion.getStores()) { 981 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 982 } 983 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 984 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 985 assertTrue(newSnapshotSize.getDataSize() == 0); 986 987 // assert that the region memstore is empty 988 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 989 assertTrue(newRegionMemstoreSize == 0); 990 991 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if 992 // any 993 994 LOG.info("-- Verifying edits from secondary"); 995 verifyData(secondaryRegion, 0, numRows, cq, families); 996 997 LOG.info("-- Verifying edits from primary."); 998 verifyData(primaryRegion, 0, numRows, cq, families); 999 } 1000 1001 /** 1002 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1003 * of the last replayed region open event. 1004 */ 1005 @Test 1006 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1007 putDataWithFlushes(primaryRegion, 100, 100, 0); 1008 int numRows = 100; 1009 1010 // close the region and open again. 1011 primaryRegion.close(); 1012 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1013 1014 // now replay the edits and the flush marker 1015 reader = createWALReaderForPrimary(); 1016 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1017 List<WAL.Entry> edits = Lists.newArrayList(); 1018 1019 LOG.info("-- Replaying edits and region events in secondary"); 1020 while (true) { 1021 WAL.Entry entry = reader.next(); 1022 if (entry == null) { 1023 break; 1024 } 1025 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1026 RegionEventDescriptor regionEventDesc = 1027 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1028 1029 if (flushDesc != null) { 1030 // don't replay flushes 1031 } else if (regionEventDesc != null) { 1032 regionEvents.add(regionEventDesc); 1033 } else { 1034 edits.add(entry); 1035 } 1036 } 1037 1038 // replay the region open of first open, but with the seqid of the second open 1039 // this way non of the flush files will be picked up. 1040 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0)) 1041 .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build()); 1042 1043 // replay edits from the before region close. If replay does not 1044 // skip these the following verification will NOT fail. 1045 for (WAL.Entry entry : edits) { 1046 replayEdit(secondaryRegion, entry); 1047 } 1048 1049 boolean expectedFail = false; 1050 try { 1051 verifyData(secondaryRegion, 0, numRows, cq, families); 1052 } catch (AssertionError e) { 1053 expectedFail = true; // expected 1054 } 1055 if (!expectedFail) { 1056 fail("Should have failed this verification"); 1057 } 1058 } 1059 1060 @Test 1061 public void testReplayFlushSeqIds() throws IOException { 1062 // load some data to primary and flush 1063 int start = 0; 1064 LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100)); 1065 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1066 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1067 primaryRegion.flush(true); 1068 1069 // now replay the flush marker 1070 reader = createWALReaderForPrimary(); 1071 1072 long flushSeqId = -1; 1073 LOG.info("-- Replaying flush events in secondary"); 1074 while (true) { 1075 WAL.Entry entry = reader.next(); 1076 if (entry == null) { 1077 break; 1078 } 1079 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1080 if (flushDesc != null) { 1081 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1082 LOG.info("-- Replaying flush start in secondary"); 1083 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1084 flushSeqId = flushDesc.getFlushSequenceNumber(); 1085 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1086 LOG.info("-- Replaying flush commit in secondary"); 1087 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1088 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1089 } 1090 } 1091 // else do not replay 1092 } 1093 1094 // TODO: what to do with this? 1095 // assert that the newly picked up flush file is visible 1096 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1097 assertEquals(flushSeqId, readPoint); 1098 1099 // after replay verify that everything is still visible 1100 verifyData(secondaryRegion, 0, 100, cq, families); 1101 } 1102 1103 @Test 1104 public void testSeqIdsFromReplay() throws IOException { 1105 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1106 // original seqIds and they are made visible through mvcc read point upon replay 1107 String method = name.getMethodName(); 1108 byte[] tableName = Bytes.toBytes(method); 1109 byte[] family = Bytes.toBytes("family"); 1110 1111 HRegion region = initHRegion(tableName, method, family); 1112 try { 1113 // replay an entry that is bigger than current read point 1114 long readPoint = region.getMVCC().getReadPoint(); 1115 long origSeqId = readPoint + 100; 1116 1117 Put put = new Put(row).addColumn(family, row, row); 1118 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1119 replay(region, put, origSeqId); 1120 1121 // read point should have advanced to this seqId 1122 assertGet(region, family, row); 1123 1124 // region seqId should have advanced at least to this seqId 1125 assertEquals(origSeqId, region.getReadPoint(null)); 1126 1127 // replay an entry that is smaller than current read point 1128 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1129 // replay does not allow reads while replay is going on. 1130 put = new Put(row2).addColumn(family, row2, row2); 1131 put.setDurability(Durability.SKIP_WAL); 1132 replay(region, put, origSeqId - 50); 1133 1134 assertGet(region, family, row2); 1135 } finally { 1136 region.close(); 1137 } 1138 } 1139 1140 /** 1141 * Tests that a region opened in secondary mode would not write region open / close events to its 1142 * WAL. n 1143 */ 1144 @Test 1145 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1146 secondaryRegion.close(); 1147 walSecondary = spy(walSecondary); 1148 1149 // test for region open and close 1150 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1151 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1152 any(WALEdit.class)); 1153 1154 // test for replay prepare flush 1155 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1156 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder() 1157 .setFlushSequenceNumber(10) 1158 .setTableName(UnsafeByteOperations 1159 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1160 .setAction(FlushAction.START_FLUSH) 1161 .setEncodedRegionName( 1162 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1163 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1164 .build()); 1165 1166 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1167 any(WALEdit.class)); 1168 1169 secondaryRegion.close(); 1170 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1171 any(WALEdit.class)); 1172 } 1173 1174 /** 1175 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1176 */ 1177 @Test 1178 public void testRegionReadsEnabledFlag() throws IOException { 1179 1180 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1181 1182 verifyData(secondaryRegion, 0, 100, cq, families); 1183 1184 // now disable reads 1185 secondaryRegion.setReadsEnabled(false); 1186 try { 1187 verifyData(secondaryRegion, 0, 100, cq, families); 1188 fail("Should have failed with IOException"); 1189 } catch (IOException ex) { 1190 // expected 1191 } 1192 1193 // verify that we can still replay data 1194 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1195 1196 // now enable reads again 1197 secondaryRegion.setReadsEnabled(true); 1198 verifyData(secondaryRegion, 0, 200, cq, families); 1199 } 1200 1201 /** 1202 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1203 * It should write the flush request marker instead. 1204 */ 1205 @Test 1206 public void testWriteFlushRequestMarker() throws IOException { 1207 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1208 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1209 assertNotNull(result); 1210 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1211 assertFalse(result.wroteFlushWalMarker); 1212 1213 // request flush again, but this time with writeFlushRequestWalMarker = true 1214 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1215 assertNotNull(result); 1216 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1217 assertTrue(result.wroteFlushWalMarker); 1218 1219 List<FlushDescriptor> flushes = Lists.newArrayList(); 1220 reader = createWALReaderForPrimary(); 1221 while (true) { 1222 WAL.Entry entry = reader.next(); 1223 if (entry == null) { 1224 break; 1225 } 1226 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1227 if (flush != null) { 1228 flushes.add(flush); 1229 } 1230 } 1231 1232 assertEquals(1, flushes.size()); 1233 assertNotNull(flushes.get(0)); 1234 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1235 } 1236 1237 /** 1238 * Test the case where the secondary region replica is not in reads enabled state because it is 1239 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush 1240 * marker entry should restore the reads enabled status in the region and allow the reads to 1241 * continue. 1242 */ 1243 @Test 1244 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1245 disableReads(secondaryRegion); 1246 1247 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1248 // triggered flush restores readsEnabled 1249 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1250 reader = createWALReaderForPrimary(); 1251 while (true) { 1252 WAL.Entry entry = reader.next(); 1253 if (entry == null) { 1254 break; 1255 } 1256 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1257 if (flush != null) { 1258 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1259 } 1260 } 1261 1262 // now reads should be enabled 1263 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1264 } 1265 1266 /** 1267 * Test the case where the secondary region replica is not in reads enabled state because it is 1268 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1269 * entries should restore the reads enabled status in the region and allow the reads to continue. 1270 */ 1271 @Test 1272 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1273 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1274 // from triggered flush restores readsEnabled 1275 disableReads(secondaryRegion); 1276 1277 // put some data in primary 1278 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1279 primaryRegion.flush(true); 1280 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1281 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1282 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1283 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1284 // but can't figure it... and this is only test that seems to suffer this flush issue. 1285 // St.Ack 20160201 1286 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1287 1288 reader = createWALReaderForPrimary(); 1289 while (true) { 1290 WAL.Entry entry = reader.next(); 1291 LOG.info(Objects.toString(entry)); 1292 if (entry == null) { 1293 break; 1294 } 1295 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1296 if (flush != null) { 1297 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1298 } else { 1299 replayEdit(secondaryRegion, entry); 1300 } 1301 } 1302 1303 // now reads should be enabled 1304 verifyData(secondaryRegion, 0, 100, cq, families); 1305 } 1306 1307 /** 1308 * Test the case where the secondary region replica is not in reads enabled state because it is 1309 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1310 * entries should restore the reads enabled status in the region and allow the reads to continue. 1311 */ 1312 @Test 1313 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1314 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1315 // from triggered flush restores readsEnabled 1316 disableReads(secondaryRegion); 1317 1318 // put some data in primary 1319 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1320 primaryRegion.flush(true); 1321 1322 reader = createWALReaderForPrimary(); 1323 while (true) { 1324 WAL.Entry entry = reader.next(); 1325 if (entry == null) { 1326 break; 1327 } 1328 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1329 if (flush != null) { 1330 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1331 } 1332 } 1333 1334 // now reads should be enabled 1335 verifyData(secondaryRegion, 0, 100, cq, families); 1336 } 1337 1338 /** 1339 * Test the case where the secondary region replica is not in reads enabled state because it is 1340 * waiting for a flush or region open marker from primary region. Replaying region open event 1341 * entry from primary should restore the reads enabled status in the region and allow the reads to 1342 * continue. 1343 */ 1344 @Test 1345 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1346 // Test case 3: Test that replaying region open event markers restores readsEnabled 1347 disableReads(secondaryRegion); 1348 1349 primaryRegion.close(); 1350 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1351 1352 reader = createWALReaderForPrimary(); 1353 while (true) { 1354 WAL.Entry entry = reader.next(); 1355 if (entry == null) { 1356 break; 1357 } 1358 1359 RegionEventDescriptor regionEventDesc = 1360 WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1361 1362 if (regionEventDesc != null) { 1363 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1364 } 1365 } 1366 1367 // now reads should be enabled 1368 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1369 } 1370 1371 @Test 1372 public void testRefresStoreFiles() throws IOException { 1373 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1374 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1375 1376 // Test case 1: refresh with an empty region 1377 secondaryRegion.refreshStoreFiles(); 1378 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1379 1380 // do one flush 1381 putDataWithFlushes(primaryRegion, 100, 100, 0); 1382 int numRows = 100; 1383 1384 // refresh the store file list, and ensure that the files are picked up. 1385 secondaryRegion.refreshStoreFiles(); 1386 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1387 secondaryRegion.getStoreFileList(families)); 1388 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1389 1390 LOG.info("-- Verifying edits from secondary"); 1391 verifyData(secondaryRegion, 0, numRows, cq, families); 1392 1393 // Test case 2: 3 some more flushes 1394 putDataWithFlushes(primaryRegion, 100, 300, 0); 1395 numRows = 300; 1396 1397 // refresh the store file list, and ensure that the files are picked up. 1398 secondaryRegion.refreshStoreFiles(); 1399 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1400 secondaryRegion.getStoreFileList(families)); 1401 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1402 1403 LOG.info("-- Verifying edits from secondary"); 1404 verifyData(secondaryRegion, 0, numRows, cq, families); 1405 1406 if (FSUtils.WINDOWS) { 1407 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1408 return; 1409 } 1410 1411 // Test case 3: compact primary files 1412 primaryRegion.compactStores(); 1413 List<HRegion> regions = new ArrayList<>(); 1414 regions.add(primaryRegion); 1415 Mockito.doReturn(regions).when(rss).getRegions(); 1416 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1417 cleaner.chore(); 1418 secondaryRegion.refreshStoreFiles(); 1419 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1420 secondaryRegion.getStoreFileList(families)); 1421 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1422 1423 LOG.info("-- Verifying edits from secondary"); 1424 verifyData(secondaryRegion, 0, numRows, cq, families); 1425 1426 LOG.info("-- Replaying edits in secondary"); 1427 1428 // Test case 4: replay some edits, ensure that memstore is dropped. 1429 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1430 putDataWithFlushes(primaryRegion, 400, 400, 0); 1431 numRows = 400; 1432 1433 reader = createWALReaderForPrimary(); 1434 while (true) { 1435 WAL.Entry entry = reader.next(); 1436 if (entry == null) { 1437 break; 1438 } 1439 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1440 if (flush != null) { 1441 // do not replay flush 1442 } else { 1443 replayEdit(secondaryRegion, entry); 1444 } 1445 } 1446 1447 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1448 1449 secondaryRegion.refreshStoreFiles(); 1450 1451 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1452 1453 LOG.info("-- Verifying edits from primary"); 1454 verifyData(primaryRegion, 0, numRows, cq, families); 1455 LOG.info("-- Verifying edits from secondary"); 1456 verifyData(secondaryRegion, 0, numRows, cq, families); 1457 } 1458 1459 /** 1460 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1461 */ 1462 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1463 List<Path> l1 = new ArrayList<>(list1.size()); 1464 for (String path : list1) { 1465 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1466 } 1467 List<Path> l2 = new ArrayList<>(list2.size()); 1468 for (String path : list2) { 1469 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1470 } 1471 assertEquals(l1, l2); 1472 } 1473 1474 private void disableReads(HRegion region) { 1475 region.setReadsEnabled(false); 1476 try { 1477 verifyData(region, 0, 1, cq, families); 1478 fail("Should have failed with IOException"); 1479 } catch (IOException ex) { 1480 // expected 1481 } 1482 } 1483 1484 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1485 put.setDurability(Durability.SKIP_WAL); 1486 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1487 region.batchReplay(new MutationReplay[] { mutation }, replaySeqId); 1488 } 1489 1490 /** 1491 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1492 */ 1493 @Test 1494 public void testReplayBulkLoadEvent() throws IOException { 1495 LOG.info("testReplayBulkLoadEvent starts"); 1496 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1497 1498 // close the region and open again. 1499 primaryRegion.close(); 1500 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1501 1502 // bulk load a file into primary region 1503 byte[] randomValues = new byte[20]; 1504 Bytes.random(randomValues); 1505 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1506 1507 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1508 int expectedLoadFileCount = 0; 1509 for (byte[] family : families) { 1510 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1511 expectedLoadFileCount++; 1512 } 1513 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1514 1515 // now replay the edits and the bulk load marker 1516 reader = createWALReaderForPrimary(); 1517 1518 LOG.info("-- Replaying edits and region events in secondary"); 1519 BulkLoadDescriptor bulkloadEvent = null; 1520 while (true) { 1521 WAL.Entry entry = reader.next(); 1522 if (entry == null) { 1523 break; 1524 } 1525 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1526 if (bulkloadEvent != null) { 1527 break; 1528 } 1529 } 1530 1531 // we should have 1 bulk load event 1532 assertTrue(bulkloadEvent != null); 1533 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1534 1535 // replay the bulk load event 1536 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1537 1538 List<String> storeFileName = new ArrayList<>(); 1539 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1540 storeFileName.addAll(storeDesc.getStoreFileList()); 1541 } 1542 // assert that the bulk loaded files are picked 1543 for (HStore s : secondaryRegion.getStores()) { 1544 for (HStoreFile sf : s.getStorefiles()) { 1545 storeFileName.remove(sf.getPath().getName()); 1546 } 1547 } 1548 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); 1549 1550 LOG.info("-- Verifying edits from secondary"); 1551 for (byte[] family : families) { 1552 assertGet(secondaryRegion, family, randomValues); 1553 } 1554 } 1555 1556 @Test 1557 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1558 // tests replaying flush commit marker, but the flush file has already been compacted 1559 // from primary and also deleted from the archive directory 1560 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder() 1561 .setFlushSequenceNumber(Long.MAX_VALUE) 1562 .setTableName(UnsafeByteOperations 1563 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1564 .setAction(FlushAction.COMMIT_FLUSH) 1565 .setEncodedRegionName( 1566 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1567 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1568 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1569 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1570 .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build()) 1571 .build()); 1572 } 1573 1574 @Test 1575 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1576 // tests replaying compaction marker, but the compaction output file has already been compacted 1577 // from primary and also deleted from the archive directory 1578 secondaryRegion 1579 .replayWALCompactionMarker( 1580 CompactionDescriptor.newBuilder() 1581 .setTableName(UnsafeByteOperations 1582 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1583 .setEncodedRegionName( 1584 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1585 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123") 1586 .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir") 1587 .setRegionName( 1588 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1589 .build(), 1590 true, true, Long.MAX_VALUE); 1591 } 1592 1593 @Test 1594 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1595 // tests replaying region open event marker, but the region files have already been compacted 1596 // from primary and also deleted from the archive directory 1597 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1598 .setTableName(UnsafeByteOperations 1599 .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1600 .setEncodedRegionName( 1601 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1602 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1603 .setEventType(EventType.REGION_OPEN) 1604 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1605 .setLogSequenceNumber(Long.MAX_VALUE) 1606 .addStores( 1607 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1608 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1609 .build()); 1610 } 1611 1612 @Test 1613 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1614 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1615 // from primary and also deleted from the archive directory 1616 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1617 .setTableName( 1618 ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1619 .setEncodedRegionName( 1620 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1621 .setBulkloadSeqNum(Long.MAX_VALUE) 1622 .addStores( 1623 StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1624 .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build()) 1625 .build()); 1626 } 1627 1628 private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes) 1629 throws IOException { 1630 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1631 // TODO We need a way to do this without creating files 1632 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); 1633 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1634 try { 1635 hFileFactory.withOutputStream(out); 1636 hFileFactory.withFileContext(new HFileContextBuilder().build()); 1637 HFile.Writer writer = hFileFactory.create(); 1638 try { 1639 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L, 1640 KeyValue.Type.Put.getCode(), valueBytes))); 1641 } finally { 1642 writer.close(); 1643 } 1644 } finally { 1645 out.close(); 1646 } 1647 return testFile.toString(); 1648 } 1649 1650 /** 1651 * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush 1652 * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but 1653 * does not execute flush after n 1654 */ 1655 private void putDataWithFlushes(HRegion region, int flushInterval, int numRows, 1656 int numRowsAfterFlush) throws IOException { 1657 int start = 0; 1658 for (; start < numRows; start += flushInterval) { 1659 LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval)); 1660 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1661 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1662 region.flush(true); 1663 } 1664 LOG.info("-- Writing some more data to primary, not flushing"); 1665 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1666 } 1667 1668 private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf, 1669 byte[]... families) throws IOException { 1670 for (int i = startRow; i < startRow + numRows; i++) { 1671 Put put = new Put(Bytes.toBytes("" + i)); 1672 put.setDurability(Durability.SKIP_WAL); 1673 for (byte[] family : families) { 1674 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1675 } 1676 replay(region, put, i + 1); 1677 } 1678 } 1679 1680 private static HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families) 1681 throws IOException { 1682 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1683 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families); 1684 } 1685 1686 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, 1687 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1688 byte[]... families) throws IOException { 1689 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, 1690 isReadOnly, durability, wal, families); 1691 } 1692}