001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.spy; 032import static org.mockito.Mockito.times; 033import static org.mockito.Mockito.verify; 034import static org.mockito.Mockito.when; 035 036import java.io.FileNotFoundException; 037import java.io.IOException; 038import java.util.ArrayList; 039import java.util.List; 040import java.util.Map; 041import java.util.Objects; 042import java.util.Random; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FSDataOutputStream; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseTestingUtility; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.KeyValue; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.Durability; 056import org.apache.hadoop.hbase.client.Get; 057import org.apache.hadoop.hbase.client.Put; 058import org.apache.hadoop.hbase.client.RegionInfo; 059import org.apache.hadoop.hbase.client.RegionInfoBuilder; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.executor.ExecutorService; 063import org.apache.hadoop.hbase.io.hfile.HFile; 064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 065import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 066import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 067import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 068import org.apache.hadoop.hbase.testclassification.LargeTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 072import org.apache.hadoop.hbase.util.FSUtils; 073import org.apache.hadoop.hbase.util.Pair; 074import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 075import org.apache.hadoop.hbase.wal.WAL; 076import org.apache.hadoop.hbase.wal.WALEdit; 077import org.apache.hadoop.hbase.wal.WALFactory; 078import org.apache.hadoop.hbase.wal.WALKeyImpl; 079import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; 080import org.apache.hadoop.util.StringUtils; 081import org.junit.After; 082import org.junit.AfterClass; 083import org.junit.Before; 084import org.junit.BeforeClass; 085import org.junit.ClassRule; 086import org.junit.Rule; 087import org.junit.Test; 088import org.junit.experimental.categories.Category; 089import org.junit.rules.TestName; 090import org.mockito.Mockito; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 095import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 096 097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 107 108/** 109 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 110 * region replicas 111 */ 112@Category(LargeTests.class) 113public class TestHRegionReplayEvents { 114 115 @ClassRule 116 public static final HBaseClassTestRule CLASS_RULE = 117 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); 118 119 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 120 @Rule public TestName name = new TestName(); 121 122 private static HBaseTestingUtility TEST_UTIL; 123 124 public static Configuration CONF; 125 private String dir; 126 127 private byte[][] families = new byte[][] { 128 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")}; 129 130 // Test names 131 protected byte[] tableName; 132 protected String method; 133 protected final byte[] row = Bytes.toBytes("rowA"); 134 protected final byte[] row2 = Bytes.toBytes("rowB"); 135 protected byte[] cq = Bytes.toBytes("cq"); 136 137 // per test fields 138 private Path rootDir; 139 private TableDescriptor htd; 140 private RegionServerServices rss; 141 private RegionInfo primaryHri, secondaryHri; 142 private HRegion primaryRegion, secondaryRegion; 143 private WAL walPrimary, walSecondary; 144 private WAL.Reader reader; 145 146 @BeforeClass 147 public static void setUpBeforeClass() throws Exception { 148 TEST_UTIL = new HBaseTestingUtility(); 149 TEST_UTIL.startMiniDFSCluster(1); 150 } 151 152 @AfterClass 153 public static void tearDownAfterClass() throws Exception { 154 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 155 TEST_UTIL.cleanupTestDir(); 156 TEST_UTIL.shutdownMiniDFSCluster(); 157 } 158 159 @Before 160 public void setUp() throws Exception { 161 CONF = TEST_UTIL.getConfiguration(); 162 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 163 method = name.getMethodName(); 164 tableName = Bytes.toBytes(name.getMethodName()); 165 rootDir = new Path(dir + method); 166 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 167 method = name.getMethodName(); 168 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 169 for (byte[] family : families) { 170 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 171 } 172 htd = builder.build(); 173 174 long time = System.currentTimeMillis(); 175 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 176 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 177 primaryHri = 178 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 179 secondaryHri = 180 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 181 182 WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir); 183 walPrimary = wals.getWAL(primaryHri); 184 walSecondary = wals.getWAL(secondaryHri); 185 186 rss = mock(RegionServerServices.class); 187 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 188 when(rss.getConfiguration()).thenReturn(CONF); 189 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 190 String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER 191 .toString(); 192 ExecutorService es = new ExecutorService(string); 193 es.startExecutorService( 194 string+"-"+string, 1); 195 when(rss.getExecutorService()).thenReturn(es); 196 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 197 primaryRegion.close(); 198 List<HRegion> regions = new ArrayList<>(); 199 regions.add(primaryRegion); 200 Mockito.doReturn(regions).when(rss).getRegions(); 201 202 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 203 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 204 205 reader = null; 206 } 207 208 @After 209 public void tearDown() throws Exception { 210 if (reader != null) { 211 reader.close(); 212 } 213 214 if (primaryRegion != null) { 215 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 216 } 217 if (secondaryRegion != null) { 218 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 219 } 220 221 EnvironmentEdgeManagerTestHelper.reset(); 222 } 223 224 String getName() { 225 return name.getMethodName(); 226 } 227 228 // Some of the test cases are as follows: 229 // 1. replay flush start marker again 230 // 2. replay flush with smaller seqId than what is there in memstore snapshot 231 // 3. replay flush with larger seqId than what is there in memstore snapshot 232 // 4. replay flush commit without flush prepare. non droppable memstore 233 // 5. replay flush commit without flush prepare. droppable memstore 234 // 6. replay open region event 235 // 7. replay open region event after flush start 236 // 8. replay flush form an earlier seqId (test ignoring seqIds) 237 // 9. start flush does not prevent region from closing. 238 239 @Test 240 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 241 // load some data and flush ensure that the secondary replica will not execute the flush 242 243 // load some data to secondary by replaying 244 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 245 246 verifyData(secondaryRegion, 0, 1000, cq, families); 247 248 // flush region 249 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true); 250 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 251 252 verifyData(secondaryRegion, 0, 1000, cq, families); 253 254 // close the region, and inspect that it has not flushed 255 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 256 // assert that there are no files (due to flush) 257 for (List<HStoreFile> f : files.values()) { 258 assertTrue(f.isEmpty()); 259 } 260 } 261 262 /** 263 * Tests a case where we replay only a flush start marker, then the region is closed. This region 264 * should not block indefinitely 265 */ 266 @Test 267 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 268 // load some data to primary and flush 269 int start = 0; 270 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); 271 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 272 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 273 primaryRegion.flush(true); 274 275 // now replay the edits and the flush marker 276 reader = createWALReaderForPrimary(); 277 278 LOG.info("-- Replaying edits and flush events in secondary"); 279 while (true) { 280 WAL.Entry entry = reader.next(); 281 if (entry == null) { 282 break; 283 } 284 FlushDescriptor flushDesc 285 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 286 if (flushDesc != null) { 287 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 288 LOG.info("-- Replaying flush start in secondary"); 289 secondaryRegion.replayWALFlushStartMarker(flushDesc); 290 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 291 LOG.info("-- NOT Replaying flush commit in secondary"); 292 } 293 } else { 294 replayEdit(secondaryRegion, entry); 295 } 296 } 297 298 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 299 // now close the region which should not cause hold because of un-committed flush 300 secondaryRegion.close(); 301 302 // verify that the memstore size is back to what it was 303 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 304 } 305 306 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 307 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 308 return 0; // handled elsewhere 309 } 310 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 311 for (Cell cell : entry.getEdit().getCells()) put.add(cell); 312 put.setDurability(Durability.SKIP_WAL); 313 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 314 region.batchReplay(new MutationReplay[] {mutation}, 315 entry.getKey().getSequenceId()); 316 return Integer.parseInt(Bytes.toString(put.getRow())); 317 } 318 319 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { 320 return WALFactory.createReader(TEST_UTIL.getTestFileSystem(), 321 AbstractFSWALProvider.getCurrentFileName(walPrimary), 322 TEST_UTIL.getConfiguration()); 323 } 324 325 @Test 326 public void testBatchReplayWithMultipleNonces() throws IOException { 327 try { 328 MutationReplay[] mutations = new MutationReplay[100]; 329 for (int i = 0; i < 100; i++) { 330 Put put = new Put(Bytes.toBytes(i)); 331 put.setDurability(Durability.SYNC_WAL); 332 for (byte[] familly : this.families) { 333 put.addColumn(familly, this.cq, null); 334 long nonceNum = i / 10; 335 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 336 } 337 } 338 primaryRegion.batchReplay(mutations, 20); 339 } catch (Exception e) { 340 String msg = "Error while replay of batch with multiple nonces. "; 341 LOG.error(msg, e); 342 fail(msg + e.getMessage()); 343 } 344 } 345 346 @Test 347 public void testReplayFlushesAndCompactions() throws IOException { 348 // initiate a secondary region with some data. 349 350 // load some data to primary and flush. 3 flushes and some more unflushed data 351 putDataWithFlushes(primaryRegion, 100, 300, 100); 352 353 // compaction from primary 354 LOG.info("-- Compacting primary, only 1 store"); 355 primaryRegion.compactStore(Bytes.toBytes("cf1"), 356 NoLimitThroughputController.INSTANCE); 357 358 // now replay the edits and the flush marker 359 reader = createWALReaderForPrimary(); 360 361 LOG.info("-- Replaying edits and flush events in secondary"); 362 int lastReplayed = 0; 363 int expectedStoreFileCount = 0; 364 while (true) { 365 WAL.Entry entry = reader.next(); 366 if (entry == null) { 367 break; 368 } 369 FlushDescriptor flushDesc 370 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 371 CompactionDescriptor compactionDesc 372 = WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 373 if (flushDesc != null) { 374 // first verify that everything is replayed and visible before flush event replay 375 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 376 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 377 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 378 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 379 MemStoreSize mss = store.getFlushableSize(); 380 long storeSize = store.getSize(); 381 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 382 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 383 LOG.info("-- Replaying flush start in secondary"); 384 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 385 assertNull(result.result); 386 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 387 388 // assert that the store memstore is smaller now 389 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 390 LOG.info("Memstore size reduced by:" 391 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 392 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 393 394 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 395 LOG.info("-- Replaying flush commit in secondary"); 396 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 397 398 // assert that the flush files are picked 399 expectedStoreFileCount++; 400 for (HStore s : secondaryRegion.getStores()) { 401 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 402 } 403 MemStoreSize newMss = store.getFlushableSize(); 404 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 405 406 // assert that the region memstore is smaller now 407 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 408 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 409 410 // assert that the store sizes are bigger 411 assertTrue(store.getSize() > storeSize); 412 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 413 assertEquals(store.getSize(), store.getStorefilesSize()); 414 } 415 // after replay verify that everything is still visible 416 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 417 } else if (compactionDesc != null) { 418 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 419 420 // assert that the compaction is applied 421 for (HStore store : secondaryRegion.getStores()) { 422 if (store.getColumnFamilyName().equals("cf1")) { 423 assertEquals(1, store.getStorefilesCount()); 424 } else { 425 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 426 } 427 } 428 } else { 429 lastReplayed = replayEdit(secondaryRegion, entry);; 430 } 431 } 432 433 assertEquals(400-1, lastReplayed); 434 LOG.info("-- Verifying edits from secondary"); 435 verifyData(secondaryRegion, 0, 400, cq, families); 436 437 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 438 verifyData(primaryRegion, 0, lastReplayed, cq, families); 439 for (HStore store : primaryRegion.getStores()) { 440 if (store.getColumnFamilyName().equals("cf1")) { 441 assertEquals(1, store.getStorefilesCount()); 442 } else { 443 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 444 } 445 } 446 } 447 448 /** 449 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 450 * equal to, greater or less than the previous flush start marker. 451 */ 452 @Test 453 public void testReplayFlushStartMarkers() throws IOException { 454 // load some data to primary and flush. 1 flush and some more unflushed data 455 putDataWithFlushes(primaryRegion, 100, 100, 100); 456 int numRows = 200; 457 458 // now replay the edits and the flush marker 459 reader = createWALReaderForPrimary(); 460 461 LOG.info("-- Replaying edits and flush events in secondary"); 462 463 FlushDescriptor startFlushDesc = null; 464 465 int lastReplayed = 0; 466 while (true) { 467 WAL.Entry entry = reader.next(); 468 if (entry == null) { 469 break; 470 } 471 FlushDescriptor flushDesc 472 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 473 if (flushDesc != null) { 474 // first verify that everything is replayed and visible before flush event replay 475 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 476 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 477 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 478 MemStoreSize mss = store.getFlushableSize(); 479 480 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 481 startFlushDesc = flushDesc; 482 LOG.info("-- Replaying flush start in secondary"); 483 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 484 assertNull(result.result); 485 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 486 assertTrue(regionMemstoreSize > 0); 487 assertTrue(mss.getHeapSize() > 0); 488 489 // assert that the store memstore is smaller now 490 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 491 LOG.info("Memstore size reduced by:" 492 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 493 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 494 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 495 496 } 497 // after replay verify that everything is still visible 498 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 499 } else { 500 lastReplayed = replayEdit(secondaryRegion, entry); 501 } 502 } 503 504 // at this point, there should be some data (rows 0-100) in memstore snapshot 505 // and some more data in memstores (rows 100-200) 506 507 verifyData(secondaryRegion, 0, numRows, cq, families); 508 509 // Test case 1: replay the same flush start marker again 510 LOG.info("-- Replaying same flush start in secondary again"); 511 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 512 assertNull(result); // this should return null. Ignoring the flush start marker 513 // assert that we still have prepared flush with the previous setup. 514 assertNotNull(secondaryRegion.getPrepareFlushResult()); 515 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 516 startFlushDesc.getFlushSequenceNumber()); 517 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 518 verifyData(secondaryRegion, 0, numRows, cq, families); 519 520 // Test case 2: replay a flush start marker with a smaller seqId 521 FlushDescriptor startFlushDescSmallerSeqId 522 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 523 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 524 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 525 assertNull(result); // this should return null. Ignoring the flush start marker 526 // assert that we still have prepared flush with the previous setup. 527 assertNotNull(secondaryRegion.getPrepareFlushResult()); 528 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 529 startFlushDesc.getFlushSequenceNumber()); 530 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 531 verifyData(secondaryRegion, 0, numRows, cq, families); 532 533 // Test case 3: replay a flush start marker with a larger seqId 534 FlushDescriptor startFlushDescLargerSeqId 535 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 536 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 537 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 538 assertNull(result); // this should return null. Ignoring the flush start marker 539 // assert that we still have prepared flush with the previous setup. 540 assertNotNull(secondaryRegion.getPrepareFlushResult()); 541 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 542 startFlushDesc.getFlushSequenceNumber()); 543 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 544 verifyData(secondaryRegion, 0, numRows, cq, families); 545 546 LOG.info("-- Verifying edits from secondary"); 547 verifyData(secondaryRegion, 0, numRows, cq, families); 548 549 LOG.info("-- Verifying edits from primary."); 550 verifyData(primaryRegion, 0, numRows, cq, families); 551 } 552 553 /** 554 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 555 * less than the previous flush start marker. 556 */ 557 @Test 558 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 559 // load some data to primary and flush. 2 flushes and some more unflushed data 560 putDataWithFlushes(primaryRegion, 100, 200, 100); 561 int numRows = 300; 562 563 // now replay the edits and the flush marker 564 reader = createWALReaderForPrimary(); 565 566 LOG.info("-- Replaying edits and flush events in secondary"); 567 FlushDescriptor startFlushDesc = null; 568 FlushDescriptor commitFlushDesc = null; 569 570 int lastReplayed = 0; 571 while (true) { 572 System.out.println(lastReplayed); 573 WAL.Entry entry = reader.next(); 574 if (entry == null) { 575 break; 576 } 577 FlushDescriptor flushDesc 578 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 579 if (flushDesc != null) { 580 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 581 // don't replay the first flush start marker, hold on to it, replay the second one 582 if (startFlushDesc == null) { 583 startFlushDesc = flushDesc; 584 } else { 585 LOG.info("-- Replaying flush start in secondary"); 586 startFlushDesc = flushDesc; 587 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 588 assertNull(result.result); 589 } 590 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 591 // do not replay any flush commit yet 592 if (commitFlushDesc == null) { 593 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 594 } 595 } 596 // after replay verify that everything is still visible 597 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 598 } else { 599 lastReplayed = replayEdit(secondaryRegion, entry); 600 } 601 } 602 603 // at this point, there should be some data (rows 0-200) in memstore snapshot 604 // and some more data in memstores (rows 200-300) 605 verifyData(secondaryRegion, 0, numRows, cq, families); 606 607 // no store files in the region 608 int expectedStoreFileCount = 0; 609 for (HStore s : secondaryRegion.getStores()) { 610 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 611 } 612 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 613 614 // Test case 1: replay the a flush commit marker smaller than what we have prepared 615 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 616 + startFlushDesc); 617 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 618 619 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 620 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 621 622 // assert that the flush files are picked 623 expectedStoreFileCount++; 624 for (HStore s : secondaryRegion.getStores()) { 625 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 626 } 627 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 628 MemStoreSize mss = store.getFlushableSize(); 629 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 630 631 // assert that the region memstore is same as before 632 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 633 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 634 635 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 636 637 LOG.info("-- Verifying edits from secondary"); 638 verifyData(secondaryRegion, 0, numRows, cq, families); 639 640 LOG.info("-- Verifying edits from primary."); 641 verifyData(primaryRegion, 0, numRows, cq, families); 642 } 643 644 /** 645 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 646 * larger than the previous flush start marker. 647 */ 648 @Test 649 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 650 // load some data to primary and flush. 1 flush and some more unflushed data 651 putDataWithFlushes(primaryRegion, 100, 100, 100); 652 int numRows = 200; 653 654 // now replay the edits and the flush marker 655 reader = createWALReaderForPrimary(); 656 657 LOG.info("-- Replaying edits and flush events in secondary"); 658 FlushDescriptor startFlushDesc = null; 659 FlushDescriptor commitFlushDesc = null; 660 661 int lastReplayed = 0; 662 while (true) { 663 WAL.Entry entry = reader.next(); 664 if (entry == null) { 665 break; 666 } 667 FlushDescriptor flushDesc 668 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 669 if (flushDesc != null) { 670 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 671 if (startFlushDesc == null) { 672 LOG.info("-- Replaying flush start in secondary"); 673 startFlushDesc = flushDesc; 674 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 675 assertNull(result.result); 676 } 677 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 678 // do not replay any flush commit yet 679 // hold on to the flush commit marker but simulate a larger 680 // flush commit seqId 681 commitFlushDesc = 682 FlushDescriptor.newBuilder(flushDesc) 683 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50) 684 .build(); 685 } 686 // after replay verify that everything is still visible 687 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 688 } else { 689 lastReplayed = replayEdit(secondaryRegion, entry); 690 } 691 } 692 693 // at this point, there should be some data (rows 0-100) in memstore snapshot 694 // and some more data in memstores (rows 100-200) 695 verifyData(secondaryRegion, 0, numRows, cq, families); 696 697 // no store files in the region 698 int expectedStoreFileCount = 0; 699 for (HStore s : secondaryRegion.getStores()) { 700 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 701 } 702 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 703 704 // Test case 1: replay the a flush commit marker larger than what we have prepared 705 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 706 + startFlushDesc); 707 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 708 709 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 710 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 711 712 // assert that the flush files are picked 713 expectedStoreFileCount++; 714 for (HStore s : secondaryRegion.getStores()) { 715 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 716 } 717 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 718 MemStoreSize mss = store.getFlushableSize(); 719 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 720 721 // assert that the region memstore is smaller than before, but not empty 722 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 723 assertTrue(newRegionMemstoreSize > 0); 724 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 725 726 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 727 728 LOG.info("-- Verifying edits from secondary"); 729 verifyData(secondaryRegion, 0, numRows, cq, families); 730 731 LOG.info("-- Verifying edits from primary."); 732 verifyData(primaryRegion, 0, numRows, cq, families); 733 } 734 735 /** 736 * Tests the case where we receive a flush commit before receiving any flush prepare markers. 737 * The memstore edits should be dropped after the flush commit replay since they should be in 738 * flushed files 739 */ 740 @Test 741 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 742 throws IOException { 743 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 744 } 745 746 /** 747 * Tests the case where we receive a flush commit before receiving any flush prepare markers. 748 * The memstore edits should be not dropped after the flush commit replay since not every edit 749 * will be in flushed files (based on seqId) 750 */ 751 @Test 752 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 753 throws IOException { 754 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 755 } 756 757 /** 758 * Tests the case where we receive a flush commit before receiving any flush prepare markers 759 */ 760 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 761 throws IOException { 762 // load some data to primary and flush. 1 flushes and some more unflushed data. 763 // write more data after flush depending on whether droppableSnapshot 764 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 765 int numRows = droppableMemstore ? 100 : 200; 766 767 // now replay the edits and the flush marker 768 reader = createWALReaderForPrimary(); 769 770 LOG.info("-- Replaying edits and flush events in secondary"); 771 FlushDescriptor commitFlushDesc = null; 772 773 int lastReplayed = 0; 774 while (true) { 775 WAL.Entry entry = reader.next(); 776 if (entry == null) { 777 break; 778 } 779 FlushDescriptor flushDesc 780 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 781 if (flushDesc != null) { 782 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 783 // do not replay flush start marker 784 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 785 commitFlushDesc = flushDesc; // hold on to the flush commit marker 786 } 787 // after replay verify that everything is still visible 788 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 789 } else { 790 lastReplayed = replayEdit(secondaryRegion, entry); 791 } 792 } 793 794 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 795 // and some more data in memstores (rows 100-300) 796 verifyData(secondaryRegion, 0, numRows, cq, families); 797 798 // no store files in the region 799 int expectedStoreFileCount = 0; 800 for (HStore s : secondaryRegion.getStores()) { 801 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 802 } 803 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 804 805 // Test case 1: replay a flush commit marker without start flush marker 806 assertNull(secondaryRegion.getPrepareFlushResult()); 807 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 808 809 // ensure all files are visible in secondary 810 for (HStore store : secondaryRegion.getStores()) { 811 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 812 } 813 814 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 815 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 816 817 // assert that the flush files are picked 818 expectedStoreFileCount++; 819 for (HStore s : secondaryRegion.getStores()) { 820 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 821 } 822 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 823 MemStoreSize mss = store.getFlushableSize(); 824 if (droppableMemstore) { 825 // assert that the memstore is dropped 826 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 827 } else { 828 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 829 } 830 831 // assert that the region memstore is same as before (we could not drop) 832 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 833 if (droppableMemstore) { 834 assertTrue(0 == newRegionMemstoreSize); 835 } else { 836 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 837 } 838 839 LOG.info("-- Verifying edits from secondary"); 840 verifyData(secondaryRegion, 0, numRows, cq, families); 841 842 LOG.info("-- Verifying edits from primary."); 843 verifyData(primaryRegion, 0, numRows, cq, families); 844 } 845 846 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 847 return FlushDescriptor.newBuilder(flush) 848 .setFlushSequenceNumber(flushSeqId) 849 .build(); 850 } 851 852 /** 853 * Tests replaying region open markers from primary region. Checks whether the files are picked up 854 */ 855 @Test 856 public void testReplayRegionOpenEvent() throws IOException { 857 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 858 int numRows = 100; 859 860 // close the region and open again. 861 primaryRegion.close(); 862 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 863 864 // now replay the edits and the flush marker 865 reader = createWALReaderForPrimary(); 866 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 867 868 LOG.info("-- Replaying edits and region events in secondary"); 869 while (true) { 870 WAL.Entry entry = reader.next(); 871 if (entry == null) { 872 break; 873 } 874 FlushDescriptor flushDesc 875 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 876 RegionEventDescriptor regionEventDesc 877 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 878 879 if (flushDesc != null) { 880 // don't replay flush events 881 } else if (regionEventDesc != null) { 882 regionEvents.add(regionEventDesc); 883 } else { 884 // don't replay edits 885 } 886 } 887 888 // we should have 1 open, 1 close and 1 open event 889 assertEquals(3, regionEvents.size()); 890 891 // replay the first region open event. 892 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 893 894 // replay the close event as well 895 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 896 897 // no store files in the region 898 int expectedStoreFileCount = 0; 899 for (HStore s : secondaryRegion.getStores()) { 900 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 901 } 902 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 903 assertTrue(regionMemstoreSize == 0); 904 905 // now replay the region open event that should contain new file locations 906 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 907 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 908 909 // assert that the flush files are picked 910 expectedStoreFileCount++; 911 for (HStore s : secondaryRegion.getStores()) { 912 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 913 } 914 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 915 MemStoreSize mss = store.getFlushableSize(); 916 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 917 918 // assert that the region memstore is empty 919 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 920 assertTrue(newRegionMemstoreSize == 0); 921 922 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any 923 924 LOG.info("-- Verifying edits from secondary"); 925 verifyData(secondaryRegion, 0, numRows, cq, families); 926 927 LOG.info("-- Verifying edits from primary."); 928 verifyData(primaryRegion, 0, numRows, cq, families); 929 } 930 931 /** 932 * Tests the case where we replay a region open event after a flush start but before receiving 933 * flush commit 934 */ 935 @Test 936 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 937 putDataWithFlushes(primaryRegion, 100, 100, 100); 938 int numRows = 200; 939 940 // close the region and open again. 941 primaryRegion.close(); 942 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 943 944 // now replay the edits and the flush marker 945 reader = createWALReaderForPrimary(); 946 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 947 948 LOG.info("-- Replaying edits and region events in secondary"); 949 while (true) { 950 WAL.Entry entry = reader.next(); 951 if (entry == null) { 952 break; 953 } 954 FlushDescriptor flushDesc 955 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 956 RegionEventDescriptor regionEventDesc 957 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 958 959 if (flushDesc != null) { 960 // only replay flush start 961 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 962 secondaryRegion.replayWALFlushStartMarker(flushDesc); 963 } 964 } else if (regionEventDesc != null) { 965 regionEvents.add(regionEventDesc); 966 } else { 967 replayEdit(secondaryRegion, entry); 968 } 969 } 970 971 // at this point, there should be some data (rows 0-100) in the memstore snapshot 972 // and some more data in memstores (rows 100-200) 973 verifyData(secondaryRegion, 0, numRows, cq, families); 974 975 // we should have 1 open, 1 close and 1 open event 976 assertEquals(3, regionEvents.size()); 977 978 // no store files in the region 979 int expectedStoreFileCount = 0; 980 for (HStore s : secondaryRegion.getStores()) { 981 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 982 } 983 984 // now replay the region open event that should contain new file locations 985 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 986 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 987 988 // assert that the flush files are picked 989 expectedStoreFileCount = 2; // two flushes happened 990 for (HStore s : secondaryRegion.getStores()) { 991 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 992 } 993 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 994 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 995 assertTrue(newSnapshotSize.getDataSize() == 0); 996 997 // assert that the region memstore is empty 998 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 999 assertTrue(newRegionMemstoreSize == 0); 1000 1001 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any 1002 1003 LOG.info("-- Verifying edits from secondary"); 1004 verifyData(secondaryRegion, 0, numRows, cq, families); 1005 1006 LOG.info("-- Verifying edits from primary."); 1007 verifyData(primaryRegion, 0, numRows, cq, families); 1008 } 1009 1010 /** 1011 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1012 * of the last replayed region open event. 1013 */ 1014 @Test 1015 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1016 putDataWithFlushes(primaryRegion, 100, 100, 0); 1017 int numRows = 100; 1018 1019 // close the region and open again. 1020 primaryRegion.close(); 1021 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1022 1023 // now replay the edits and the flush marker 1024 reader = createWALReaderForPrimary(); 1025 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1026 List<WAL.Entry> edits = Lists.newArrayList(); 1027 1028 LOG.info("-- Replaying edits and region events in secondary"); 1029 while (true) { 1030 WAL.Entry entry = reader.next(); 1031 if (entry == null) { 1032 break; 1033 } 1034 FlushDescriptor flushDesc 1035 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1036 RegionEventDescriptor regionEventDesc 1037 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1038 1039 if (flushDesc != null) { 1040 // don't replay flushes 1041 } else if (regionEventDesc != null) { 1042 regionEvents.add(regionEventDesc); 1043 } else { 1044 edits.add(entry); 1045 } 1046 } 1047 1048 // replay the region open of first open, but with the seqid of the second open 1049 // this way non of the flush files will be picked up. 1050 secondaryRegion.replayWALRegionEventMarker( 1051 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber( 1052 regionEvents.get(2).getLogSequenceNumber()).build()); 1053 1054 1055 // replay edits from the before region close. If replay does not 1056 // skip these the following verification will NOT fail. 1057 for (WAL.Entry entry: edits) { 1058 replayEdit(secondaryRegion, entry); 1059 } 1060 1061 boolean expectedFail = false; 1062 try { 1063 verifyData(secondaryRegion, 0, numRows, cq, families); 1064 } catch (AssertionError e) { 1065 expectedFail = true; // expected 1066 } 1067 if (!expectedFail) { 1068 fail("Should have failed this verification"); 1069 } 1070 } 1071 1072 @Test 1073 public void testReplayFlushSeqIds() throws IOException { 1074 // load some data to primary and flush 1075 int start = 0; 1076 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); 1077 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1078 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1079 primaryRegion.flush(true); 1080 1081 // now replay the flush marker 1082 reader = createWALReaderForPrimary(); 1083 1084 long flushSeqId = -1; 1085 LOG.info("-- Replaying flush events in secondary"); 1086 while (true) { 1087 WAL.Entry entry = reader.next(); 1088 if (entry == null) { 1089 break; 1090 } 1091 FlushDescriptor flushDesc 1092 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1093 if (flushDesc != null) { 1094 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1095 LOG.info("-- Replaying flush start in secondary"); 1096 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1097 flushSeqId = flushDesc.getFlushSequenceNumber(); 1098 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1099 LOG.info("-- Replaying flush commit in secondary"); 1100 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1101 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1102 } 1103 } 1104 // else do not replay 1105 } 1106 1107 // TODO: what to do with this? 1108 // assert that the newly picked up flush file is visible 1109 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1110 assertEquals(flushSeqId, readPoint); 1111 1112 // after replay verify that everything is still visible 1113 verifyData(secondaryRegion, 0, 100, cq, families); 1114 } 1115 1116 @Test 1117 public void testSeqIdsFromReplay() throws IOException { 1118 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1119 // original seqIds and they are made visible through mvcc read point upon replay 1120 String method = name.getMethodName(); 1121 byte[] tableName = Bytes.toBytes(method); 1122 byte[] family = Bytes.toBytes("family"); 1123 1124 HRegion region = initHRegion(tableName, method, family); 1125 try { 1126 // replay an entry that is bigger than current read point 1127 long readPoint = region.getMVCC().getReadPoint(); 1128 long origSeqId = readPoint + 100; 1129 1130 Put put = new Put(row).addColumn(family, row, row); 1131 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1132 replay(region, put, origSeqId); 1133 1134 // read point should have advanced to this seqId 1135 assertGet(region, family, row); 1136 1137 // region seqId should have advanced at least to this seqId 1138 assertEquals(origSeqId, region.getReadPoint(null)); 1139 1140 // replay an entry that is smaller than current read point 1141 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1142 // replay does not allow reads while replay is going on. 1143 put = new Put(row2).addColumn(family, row2, row2); 1144 put.setDurability(Durability.SKIP_WAL); 1145 replay(region, put, origSeqId - 50); 1146 1147 assertGet(region, family, row2); 1148 } finally { 1149 region.close(); 1150 } 1151 } 1152 1153 /** 1154 * Tests that a region opened in secondary mode would not write region open / close 1155 * events to its WAL. 1156 * @throws IOException 1157 */ 1158 @Test 1159 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1160 secondaryRegion.close(); 1161 walSecondary = spy(walSecondary); 1162 1163 // test for region open and close 1164 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1165 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1166 any(WALEdit.class)); 1167 1168 // test for replay prepare flush 1169 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1170 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder(). 1171 setFlushSequenceNumber(10) 1172 .setTableName(UnsafeByteOperations.unsafeWrap( 1173 primaryRegion.getTableDescriptor().getTableName().getName())) 1174 .setAction(FlushAction.START_FLUSH) 1175 .setEncodedRegionName( 1176 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1177 .setRegionName(UnsafeByteOperations.unsafeWrap( 1178 primaryRegion.getRegionInfo().getRegionName())) 1179 .build()); 1180 1181 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1182 any(WALEdit.class)); 1183 1184 secondaryRegion.close(); 1185 verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class), 1186 any(WALEdit.class)); 1187 } 1188 1189 /** 1190 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1191 */ 1192 @Test 1193 public void testRegionReadsEnabledFlag() throws IOException { 1194 1195 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1196 1197 verifyData(secondaryRegion, 0, 100, cq, families); 1198 1199 // now disable reads 1200 secondaryRegion.setReadsEnabled(false); 1201 try { 1202 verifyData(secondaryRegion, 0, 100, cq, families); 1203 fail("Should have failed with IOException"); 1204 } catch(IOException ex) { 1205 // expected 1206 } 1207 1208 // verify that we can still replay data 1209 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1210 1211 // now enable reads again 1212 secondaryRegion.setReadsEnabled(true); 1213 verifyData(secondaryRegion, 0, 200, cq, families); 1214 } 1215 1216 /** 1217 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1218 * It should write the flush request marker instead. 1219 */ 1220 @Test 1221 public void testWriteFlushRequestMarker() throws IOException { 1222 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1223 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1224 assertNotNull(result); 1225 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1226 assertFalse(result.wroteFlushWalMarker); 1227 1228 // request flush again, but this time with writeFlushRequestWalMarker = true 1229 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1230 assertNotNull(result); 1231 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1232 assertTrue(result.wroteFlushWalMarker); 1233 1234 List<FlushDescriptor> flushes = Lists.newArrayList(); 1235 reader = createWALReaderForPrimary(); 1236 while (true) { 1237 WAL.Entry entry = reader.next(); 1238 if (entry == null) { 1239 break; 1240 } 1241 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1242 if (flush != null) { 1243 flushes.add(flush); 1244 } 1245 } 1246 1247 assertEquals(1, flushes.size()); 1248 assertNotNull(flushes.get(0)); 1249 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1250 } 1251 1252 /** 1253 * Test the case where the secondary region replica is not in reads enabled state because it is 1254 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH 1255 * flush marker entry should restore the reads enabled status in the region and allow the reads 1256 * to continue. 1257 */ 1258 @Test 1259 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1260 disableReads(secondaryRegion); 1261 1262 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1263 // triggered flush restores readsEnabled 1264 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1265 reader = createWALReaderForPrimary(); 1266 while (true) { 1267 WAL.Entry entry = reader.next(); 1268 if (entry == null) { 1269 break; 1270 } 1271 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1272 if (flush != null) { 1273 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1274 } 1275 } 1276 1277 // now reads should be enabled 1278 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1279 } 1280 1281 /** 1282 * Test the case where the secondary region replica is not in reads enabled state because it is 1283 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1284 * entries should restore the reads enabled status in the region and allow the reads 1285 * to continue. 1286 */ 1287 @Test 1288 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1289 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1290 // from triggered flush restores readsEnabled 1291 disableReads(secondaryRegion); 1292 1293 // put some data in primary 1294 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1295 primaryRegion.flush(true); 1296 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1297 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1298 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1299 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1300 // but can't figure it... and this is only test that seems to suffer this flush issue. 1301 // St.Ack 20160201 1302 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1303 1304 reader = createWALReaderForPrimary(); 1305 while (true) { 1306 WAL.Entry entry = reader.next(); 1307 LOG.info(Objects.toString(entry)); 1308 if (entry == null) { 1309 break; 1310 } 1311 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1312 if (flush != null) { 1313 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1314 } else { 1315 replayEdit(secondaryRegion, entry); 1316 } 1317 } 1318 1319 // now reads should be enabled 1320 verifyData(secondaryRegion, 0, 100, cq, families); 1321 } 1322 1323 /** 1324 * Test the case where the secondary region replica is not in reads enabled state because it is 1325 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1326 * entries should restore the reads enabled status in the region and allow the reads 1327 * to continue. 1328 */ 1329 @Test 1330 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1331 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1332 // from triggered flush restores readsEnabled 1333 disableReads(secondaryRegion); 1334 1335 // put some data in primary 1336 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1337 primaryRegion.flush(true); 1338 1339 reader = createWALReaderForPrimary(); 1340 while (true) { 1341 WAL.Entry entry = reader.next(); 1342 if (entry == null) { 1343 break; 1344 } 1345 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1346 if (flush != null) { 1347 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1348 } 1349 } 1350 1351 // now reads should be enabled 1352 verifyData(secondaryRegion, 0, 100, cq, families); 1353 } 1354 1355 /** 1356 * Test the case where the secondary region replica is not in reads enabled state because it is 1357 * waiting for a flush or region open marker from primary region. Replaying region open event 1358 * entry from primary should restore the reads enabled status in the region and allow the reads 1359 * to continue. 1360 */ 1361 @Test 1362 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1363 // Test case 3: Test that replaying region open event markers restores readsEnabled 1364 disableReads(secondaryRegion); 1365 1366 primaryRegion.close(); 1367 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1368 1369 reader = createWALReaderForPrimary(); 1370 while (true) { 1371 WAL.Entry entry = reader.next(); 1372 if (entry == null) { 1373 break; 1374 } 1375 1376 RegionEventDescriptor regionEventDesc 1377 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1378 1379 if (regionEventDesc != null) { 1380 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1381 } 1382 } 1383 1384 // now reads should be enabled 1385 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1386 } 1387 1388 @Test 1389 public void testRefresStoreFiles() throws IOException { 1390 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1391 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1392 1393 // Test case 1: refresh with an empty region 1394 secondaryRegion.refreshStoreFiles(); 1395 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1396 1397 // do one flush 1398 putDataWithFlushes(primaryRegion, 100, 100, 0); 1399 int numRows = 100; 1400 1401 // refresh the store file list, and ensure that the files are picked up. 1402 secondaryRegion.refreshStoreFiles(); 1403 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1404 secondaryRegion.getStoreFileList(families)); 1405 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1406 1407 LOG.info("-- Verifying edits from secondary"); 1408 verifyData(secondaryRegion, 0, numRows, cq, families); 1409 1410 // Test case 2: 3 some more flushes 1411 putDataWithFlushes(primaryRegion, 100, 300, 0); 1412 numRows = 300; 1413 1414 // refresh the store file list, and ensure that the files are picked up. 1415 secondaryRegion.refreshStoreFiles(); 1416 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1417 secondaryRegion.getStoreFileList(families)); 1418 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1419 1420 LOG.info("-- Verifying edits from secondary"); 1421 verifyData(secondaryRegion, 0, numRows, cq, families); 1422 1423 if (FSUtils.WINDOWS) { 1424 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1425 return; 1426 } 1427 1428 // Test case 3: compact primary files 1429 primaryRegion.compactStores(); 1430 List<HRegion> regions = new ArrayList<>(); 1431 regions.add(primaryRegion); 1432 Mockito.doReturn(regions).when(rss).getRegions(); 1433 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1434 cleaner.chore(); 1435 secondaryRegion.refreshStoreFiles(); 1436 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1437 secondaryRegion.getStoreFileList(families)); 1438 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1439 1440 LOG.info("-- Verifying edits from secondary"); 1441 verifyData(secondaryRegion, 0, numRows, cq, families); 1442 1443 LOG.info("-- Replaying edits in secondary"); 1444 1445 // Test case 4: replay some edits, ensure that memstore is dropped. 1446 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1447 putDataWithFlushes(primaryRegion, 400, 400, 0); 1448 numRows = 400; 1449 1450 reader = createWALReaderForPrimary(); 1451 while (true) { 1452 WAL.Entry entry = reader.next(); 1453 if (entry == null) { 1454 break; 1455 } 1456 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1457 if (flush != null) { 1458 // do not replay flush 1459 } else { 1460 replayEdit(secondaryRegion, entry); 1461 } 1462 } 1463 1464 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1465 1466 secondaryRegion.refreshStoreFiles(); 1467 1468 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1469 1470 LOG.info("-- Verifying edits from primary"); 1471 verifyData(primaryRegion, 0, numRows, cq, families); 1472 LOG.info("-- Verifying edits from secondary"); 1473 verifyData(secondaryRegion, 0, numRows, cq, families); 1474 } 1475 1476 /** 1477 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1478 */ 1479 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1480 List<Path> l1 = new ArrayList<>(list1.size()); 1481 for (String path : list1) { 1482 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1483 } 1484 List<Path> l2 = new ArrayList<>(list2.size()); 1485 for (String path : list2) { 1486 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1487 } 1488 assertEquals(l1, l2); 1489 } 1490 1491 private void disableReads(HRegion region) { 1492 region.setReadsEnabled(false); 1493 try { 1494 verifyData(region, 0, 1, cq, families); 1495 fail("Should have failed with IOException"); 1496 } catch(IOException ex) { 1497 // expected 1498 } 1499 } 1500 1501 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1502 put.setDurability(Durability.SKIP_WAL); 1503 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1504 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); 1505 } 1506 1507 /** 1508 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1509 */ 1510 @Test 1511 public void testReplayBulkLoadEvent() throws IOException { 1512 LOG.info("testReplayBulkLoadEvent starts"); 1513 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1514 1515 // close the region and open again. 1516 primaryRegion.close(); 1517 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1518 1519 // bulk load a file into primary region 1520 Random random = new Random(); 1521 byte[] randomValues = new byte[20]; 1522 random.nextBytes(randomValues); 1523 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1524 1525 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1526 int expectedLoadFileCount = 0; 1527 for (byte[] family : families) { 1528 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1529 expectedLoadFileCount++; 1530 } 1531 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1532 1533 // now replay the edits and the bulk load marker 1534 reader = createWALReaderForPrimary(); 1535 1536 LOG.info("-- Replaying edits and region events in secondary"); 1537 BulkLoadDescriptor bulkloadEvent = null; 1538 while (true) { 1539 WAL.Entry entry = reader.next(); 1540 if (entry == null) { 1541 break; 1542 } 1543 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1544 if (bulkloadEvent != null) { 1545 break; 1546 } 1547 } 1548 1549 // we should have 1 bulk load event 1550 assertTrue(bulkloadEvent != null); 1551 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1552 1553 // replay the bulk load event 1554 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1555 1556 1557 List<String> storeFileName = new ArrayList<>(); 1558 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1559 storeFileName.addAll(storeDesc.getStoreFileList()); 1560 } 1561 // assert that the bulk loaded files are picked 1562 for (HStore s : secondaryRegion.getStores()) { 1563 for (HStoreFile sf : s.getStorefiles()) { 1564 storeFileName.remove(sf.getPath().getName()); 1565 } 1566 } 1567 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); 1568 1569 LOG.info("-- Verifying edits from secondary"); 1570 for (byte[] family : families) { 1571 assertGet(secondaryRegion, family, randomValues); 1572 } 1573 } 1574 1575 @Test 1576 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1577 // tests replaying flush commit marker, but the flush file has already been compacted 1578 // from primary and also deleted from the archive directory 1579 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder(). 1580 setFlushSequenceNumber(Long.MAX_VALUE) 1581 .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1582 .setAction(FlushAction.COMMIT_FLUSH) 1583 .setEncodedRegionName( 1584 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1585 .setRegionName(UnsafeByteOperations.unsafeWrap( 1586 primaryRegion.getRegionInfo().getRegionName())) 1587 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1588 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1589 .setStoreHomeDir("/store_home_dir") 1590 .addFlushOutput("/foo/baz/123") 1591 .build()) 1592 .build()); 1593 } 1594 1595 @Test 1596 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1597 // tests replaying compaction marker, but the compaction output file has already been compacted 1598 // from primary and also deleted from the archive directory 1599 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder() 1600 .setTableName(UnsafeByteOperations.unsafeWrap( 1601 primaryRegion.getTableDescriptor().getTableName().getName())) 1602 .setEncodedRegionName( 1603 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1604 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1605 .addCompactionInput("/123") 1606 .addCompactionOutput("/456") 1607 .setStoreHomeDir("/store_home_dir") 1608 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1609 .build() 1610 , true, true, Long.MAX_VALUE); 1611 } 1612 1613 @Test 1614 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1615 // tests replaying region open event marker, but the region files have already been compacted 1616 // from primary and also deleted from the archive directory 1617 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1618 .setTableName(UnsafeByteOperations.unsafeWrap( 1619 primaryRegion.getTableDescriptor().getTableName().getName())) 1620 .setEncodedRegionName( 1621 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1622 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1623 .setEventType(EventType.REGION_OPEN) 1624 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1625 .setLogSequenceNumber(Long.MAX_VALUE) 1626 .addStores(StoreDescriptor.newBuilder() 1627 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1628 .setStoreHomeDir("/store_home_dir") 1629 .addStoreFile("/123") 1630 .build()) 1631 .build()); 1632 } 1633 1634 @Test 1635 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1636 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1637 // from primary and also deleted from the archive directory 1638 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1639 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1640 .setEncodedRegionName( 1641 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1642 .setBulkloadSeqNum(Long.MAX_VALUE) 1643 .addStores(StoreDescriptor.newBuilder() 1644 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1645 .setStoreHomeDir("/store_home_dir") 1646 .addStoreFile("/123") 1647 .build()) 1648 .build()); 1649 } 1650 1651 private String createHFileForFamilies(Path testPath, byte[] family, 1652 byte[] valueBytes) throws IOException { 1653 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1654 // TODO We need a way to do this without creating files 1655 Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString()); 1656 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1657 try { 1658 hFileFactory.withOutputStream(out); 1659 hFileFactory.withFileContext(new HFileContextBuilder().build()); 1660 HFile.Writer writer = hFileFactory.create(); 1661 try { 1662 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L, 1663 KeyValue.Type.Put.getCode(), valueBytes))); 1664 } finally { 1665 writer.close(); 1666 } 1667 } finally { 1668 out.close(); 1669 } 1670 return testFile.toString(); 1671 } 1672 1673 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does 1674 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of 1675 * more rows but does not execute flush after 1676 * @throws IOException */ 1677 private void putDataWithFlushes(HRegion region, int flushInterval, 1678 int numRows, int numRowsAfterFlush) throws IOException { 1679 int start = 0; 1680 for (; start < numRows; start += flushInterval) { 1681 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval)); 1682 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1683 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1684 region.flush(true); 1685 } 1686 LOG.info("-- Writing some more data to primary, not flushing"); 1687 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1688 } 1689 1690 private void putDataByReplay(HRegion region, 1691 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 1692 for (int i = startRow; i < startRow + numRows; i++) { 1693 Put put = new Put(Bytes.toBytes("" + i)); 1694 put.setDurability(Durability.SKIP_WAL); 1695 for (byte[] family : families) { 1696 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1697 } 1698 replay(region, put, i+1); 1699 } 1700 } 1701 1702 private static HRegion initHRegion(byte[] tableName, 1703 String callingMethod, byte[]... families) throws IOException { 1704 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1705 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families); 1706 } 1707 1708 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, 1709 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, 1710 WAL wal, byte[]... families) throws IOException { 1711 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, 1712 isReadOnly, durability, wal, families); 1713 } 1714}