001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet; 021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData; 022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029import static org.mockito.ArgumentMatchers.any; 030import static org.mockito.ArgumentMatchers.anyBoolean; 031import static org.mockito.Mockito.mock; 032import static org.mockito.Mockito.spy; 033import static org.mockito.Mockito.times; 034import static org.mockito.Mockito.verify; 035import static org.mockito.Mockito.when; 036 037import java.io.FileNotFoundException; 038import java.io.IOException; 039import java.util.ArrayList; 040import java.util.List; 041import java.util.Map; 042import java.util.Objects; 043import java.util.Random; 044import java.util.UUID; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FSDataOutputStream; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.Cell; 049import org.apache.hadoop.hbase.CellUtil; 050import org.apache.hadoop.hbase.HBaseClassTestRule; 051import org.apache.hadoop.hbase.HBaseTestingUtility; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.KeyValue; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 057import org.apache.hadoop.hbase.client.Durability; 058import org.apache.hadoop.hbase.client.Get; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.RegionInfo; 061import org.apache.hadoop.hbase.client.RegionInfoBuilder; 062import org.apache.hadoop.hbase.client.TableDescriptor; 063import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 064import org.apache.hadoop.hbase.executor.ExecutorService; 065import org.apache.hadoop.hbase.io.hfile.HFile; 066import org.apache.hadoop.hbase.io.hfile.HFileContext; 067import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; 068import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; 069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; 070import org.apache.hadoop.hbase.testclassification.MediumTests; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 074import org.apache.hadoop.hbase.util.FSUtils; 075import org.apache.hadoop.hbase.util.Pair; 076import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 077import org.apache.hadoop.hbase.wal.WAL; 078import org.apache.hadoop.hbase.wal.WALEdit; 079import org.apache.hadoop.hbase.wal.WALFactory; 080import org.apache.hadoop.hbase.wal.WALKeyImpl; 081import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; 082import org.apache.hadoop.util.StringUtils; 083import org.junit.After; 084import org.junit.Before; 085import org.junit.ClassRule; 086import org.junit.Rule; 087import org.junit.Test; 088import org.junit.experimental.categories.Category; 089import org.junit.rules.TestName; 090import org.mockito.Mockito; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 095import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 096 097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; 107 108/** 109 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary 110 * region replicas 111 */ 112@Category(MediumTests.class) 113public class TestHRegionReplayEvents { 114 115 @ClassRule 116 public static final HBaseClassTestRule CLASS_RULE = 117 HBaseClassTestRule.forClass(TestHRegionReplayEvents.class); 118 119 private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class); 120 @Rule public TestName name = new TestName(); 121 122 private static HBaseTestingUtility TEST_UTIL; 123 124 public static Configuration CONF ; 125 private String dir; 126 127 private byte[][] families = new byte[][] { 128 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")}; 129 130 // Test names 131 protected byte[] tableName; 132 protected String method; 133 protected final byte[] row = Bytes.toBytes("rowA"); 134 protected final byte[] row2 = Bytes.toBytes("rowB"); 135 protected byte[] cq = Bytes.toBytes("cq"); 136 137 // per test fields 138 private Path rootDir; 139 private TableDescriptor htd; 140 private long time; 141 private RegionServerServices rss; 142 private RegionInfo primaryHri, secondaryHri; 143 private HRegion primaryRegion, secondaryRegion; 144 private WALFactory wals; 145 private WAL walPrimary, walSecondary; 146 private WAL.Reader reader; 147 148 @Before 149 public void setup() throws IOException { 150 TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 151 CONF = TEST_UTIL.getConfiguration(); 152 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString(); 153 method = name.getMethodName(); 154 tableName = Bytes.toBytes(name.getMethodName()); 155 rootDir = new Path(dir + method); 156 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); 157 method = name.getMethodName(); 158 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method)); 159 for (byte[] family : families) { 160 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); 161 } 162 htd = builder.build(); 163 164 time = System.currentTimeMillis(); 165 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); 166 primaryHri = 167 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build(); 168 secondaryHri = 169 RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build(); 170 171 wals = TestHRegion.createWALFactory(CONF, rootDir); 172 walPrimary = wals.getWAL(primaryHri); 173 walSecondary = wals.getWAL(secondaryHri); 174 175 rss = mock(RegionServerServices.class); 176 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 177 when(rss.getConfiguration()).thenReturn(CONF); 178 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); 179 String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER 180 .toString(); 181 ExecutorService es = new ExecutorService(string); 182 es.startExecutorService( 183 string+"-"+string, 1); 184 when(rss.getExecutorService()).thenReturn(es); 185 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); 186 primaryRegion.close(); 187 List<HRegion> regions = new ArrayList<>(); 188 regions.add(primaryRegion); 189 Mockito.doReturn(regions).when(rss).getRegions(); 190 191 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 192 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); 193 194 reader = null; 195 } 196 197 @After 198 public void tearDown() throws Exception { 199 if (reader != null) { 200 reader.close(); 201 } 202 203 if (primaryRegion != null) { 204 HBaseTestingUtility.closeRegionAndWAL(primaryRegion); 205 } 206 if (secondaryRegion != null) { 207 HBaseTestingUtility.closeRegionAndWAL(secondaryRegion); 208 } 209 210 EnvironmentEdgeManagerTestHelper.reset(); 211 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); 212 TEST_UTIL.cleanupTestDir(); 213 } 214 215 String getName() { 216 return name.getMethodName(); 217 } 218 219 // Some of the test cases are as follows: 220 // 1. replay flush start marker again 221 // 2. replay flush with smaller seqId than what is there in memstore snapshot 222 // 3. replay flush with larger seqId than what is there in memstore snapshot 223 // 4. replay flush commit without flush prepare. non droppable memstore 224 // 5. replay flush commit without flush prepare. droppable memstore 225 // 6. replay open region event 226 // 7. replay open region event after flush start 227 // 8. replay flush form an earlier seqId (test ignoring seqIds) 228 // 9. start flush does not prevent region from closing. 229 230 @Test 231 public void testRegionReplicaSecondaryCannotFlush() throws IOException { 232 // load some data and flush ensure that the secondary replica will not execute the flush 233 234 // load some data to secondary by replaying 235 putDataByReplay(secondaryRegion, 0, 1000, cq, families); 236 237 verifyData(secondaryRegion, 0, 1000, cq, families); 238 239 // flush region 240 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true); 241 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result); 242 243 verifyData(secondaryRegion, 0, 1000, cq, families); 244 245 // close the region, and inspect that it has not flushed 246 Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false); 247 // assert that there are no files (due to flush) 248 for (List<HStoreFile> f : files.values()) { 249 assertTrue(f.isEmpty()); 250 } 251 } 252 253 /** 254 * Tests a case where we replay only a flush start marker, then the region is closed. This region 255 * should not block indefinitely 256 */ 257 @Test 258 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException { 259 // load some data to primary and flush 260 int start = 0; 261 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); 262 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 263 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 264 primaryRegion.flush(true); 265 266 // now replay the edits and the flush marker 267 reader = createWALReaderForPrimary(); 268 269 LOG.info("-- Replaying edits and flush events in secondary"); 270 while (true) { 271 WAL.Entry entry = reader.next(); 272 if (entry == null) { 273 break; 274 } 275 FlushDescriptor flushDesc 276 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 277 if (flushDesc != null) { 278 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 279 LOG.info("-- Replaying flush start in secondary"); 280 secondaryRegion.replayWALFlushStartMarker(flushDesc); 281 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 282 LOG.info("-- NOT Replaying flush commit in secondary"); 283 } 284 } else { 285 replayEdit(secondaryRegion, entry); 286 } 287 } 288 289 assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0); 290 // now close the region which should not cause hold because of un-committed flush 291 secondaryRegion.close(); 292 293 // verify that the memstore size is back to what it was 294 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize()); 295 } 296 297 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { 298 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) { 299 return 0; // handled elsewhere 300 } 301 Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0))); 302 for (Cell cell : entry.getEdit().getCells()) put.add(cell); 303 put.setDurability(Durability.SKIP_WAL); 304 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 305 region.batchReplay(new MutationReplay[] {mutation}, 306 entry.getKey().getSequenceId()); 307 return Integer.parseInt(Bytes.toString(put.getRow())); 308 } 309 310 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { 311 return WALFactory.createReader(TEST_UTIL.getTestFileSystem(), 312 AbstractFSWALProvider.getCurrentFileName(walPrimary), 313 TEST_UTIL.getConfiguration()); 314 } 315 316 @Test 317 public void testBatchReplayWithMultipleNonces() throws IOException { 318 try { 319 MutationReplay[] mutations = new MutationReplay[100]; 320 for (int i = 0; i < 100; i++) { 321 Put put = new Put(Bytes.toBytes(i)); 322 put.setDurability(Durability.SYNC_WAL); 323 for (byte[] familly : this.families) { 324 put.addColumn(familly, this.cq, null); 325 long nonceNum = i / 10; 326 mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum); 327 } 328 } 329 primaryRegion.batchReplay(mutations, 20); 330 } catch (Exception e) { 331 String msg = "Error while replay of batch with multiple nonces. "; 332 LOG.error(msg, e); 333 fail(msg + e.getMessage()); 334 } 335 } 336 337 @Test 338 public void testReplayFlushesAndCompactions() throws IOException { 339 // initiate a secondary region with some data. 340 341 // load some data to primary and flush. 3 flushes and some more unflushed data 342 putDataWithFlushes(primaryRegion, 100, 300, 100); 343 344 // compaction from primary 345 LOG.info("-- Compacting primary, only 1 store"); 346 primaryRegion.compactStore(Bytes.toBytes("cf1"), 347 NoLimitThroughputController.INSTANCE); 348 349 // now replay the edits and the flush marker 350 reader = createWALReaderForPrimary(); 351 352 LOG.info("-- Replaying edits and flush events in secondary"); 353 int lastReplayed = 0; 354 int expectedStoreFileCount = 0; 355 while (true) { 356 WAL.Entry entry = reader.next(); 357 if (entry == null) { 358 break; 359 } 360 FlushDescriptor flushDesc 361 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 362 CompactionDescriptor compactionDesc 363 = WALEdit.getCompaction(entry.getEdit().getCells().get(0)); 364 if (flushDesc != null) { 365 // first verify that everything is replayed and visible before flush event replay 366 verifyData(secondaryRegion, 0, lastReplayed, cq, families); 367 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 368 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 369 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 370 MemStoreSize mss = store.getFlushableSize(); 371 long storeSize = store.getSize(); 372 long storeSizeUncompressed = store.getStoreSizeUncompressed(); 373 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 374 LOG.info("-- Replaying flush start in secondary"); 375 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc); 376 assertNull(result.result); 377 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber()); 378 379 // assert that the store memstore is smaller now 380 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 381 LOG.info("Memstore size reduced by:" 382 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 383 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 384 385 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 386 LOG.info("-- Replaying flush commit in secondary"); 387 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 388 389 // assert that the flush files are picked 390 expectedStoreFileCount++; 391 for (HStore s : secondaryRegion.getStores()) { 392 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 393 } 394 MemStoreSize newMss = store.getFlushableSize(); 395 assertTrue(mss.getHeapSize() > newMss.getHeapSize()); 396 397 // assert that the region memstore is smaller now 398 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 399 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 400 401 // assert that the store sizes are bigger 402 assertTrue(store.getSize() > storeSize); 403 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed); 404 assertEquals(store.getSize(), store.getStorefilesSize()); 405 } 406 // after replay verify that everything is still visible 407 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 408 } else if (compactionDesc != null) { 409 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE); 410 411 // assert that the compaction is applied 412 for (HStore store : secondaryRegion.getStores()) { 413 if (store.getColumnFamilyName().equals("cf1")) { 414 assertEquals(1, store.getStorefilesCount()); 415 } else { 416 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 417 } 418 } 419 } else { 420 lastReplayed = replayEdit(secondaryRegion, entry);; 421 } 422 } 423 424 assertEquals(400-1, lastReplayed); 425 LOG.info("-- Verifying edits from secondary"); 426 verifyData(secondaryRegion, 0, 400, cq, families); 427 428 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted"); 429 verifyData(primaryRegion, 0, lastReplayed, cq, families); 430 for (HStore store : primaryRegion.getStores()) { 431 if (store.getColumnFamilyName().equals("cf1")) { 432 assertEquals(1, store.getStorefilesCount()); 433 } else { 434 assertEquals(expectedStoreFileCount, store.getStorefilesCount()); 435 } 436 } 437 } 438 439 /** 440 * Tests cases where we prepare a flush with some seqId and we receive other flush start markers 441 * equal to, greater or less than the previous flush start marker. 442 */ 443 @Test 444 public void testReplayFlushStartMarkers() throws IOException { 445 // load some data to primary and flush. 1 flush and some more unflushed data 446 putDataWithFlushes(primaryRegion, 100, 100, 100); 447 int numRows = 200; 448 449 // now replay the edits and the flush marker 450 reader = createWALReaderForPrimary(); 451 452 LOG.info("-- Replaying edits and flush events in secondary"); 453 454 FlushDescriptor startFlushDesc = null; 455 456 int lastReplayed = 0; 457 while (true) { 458 WAL.Entry entry = reader.next(); 459 if (entry == null) { 460 break; 461 } 462 FlushDescriptor flushDesc 463 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 464 if (flushDesc != null) { 465 // first verify that everything is replayed and visible before flush event replay 466 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 467 long storeMemstoreSize = store.getMemStoreSize().getHeapSize(); 468 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 469 MemStoreSize mss = store.getFlushableSize(); 470 471 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 472 startFlushDesc = flushDesc; 473 LOG.info("-- Replaying flush start in secondary"); 474 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 475 assertNull(result.result); 476 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber()); 477 assertTrue(regionMemstoreSize > 0); 478 assertTrue(mss.getHeapSize() > 0); 479 480 // assert that the store memstore is smaller now 481 long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize(); 482 LOG.info("Memstore size reduced by:" 483 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize)); 484 assertTrue(storeMemstoreSize > newStoreMemstoreSize); 485 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 486 487 } 488 // after replay verify that everything is still visible 489 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 490 } else { 491 lastReplayed = replayEdit(secondaryRegion, entry); 492 } 493 } 494 495 // at this point, there should be some data (rows 0-100) in memstore snapshot 496 // and some more data in memstores (rows 100-200) 497 498 verifyData(secondaryRegion, 0, numRows, cq, families); 499 500 // Test case 1: replay the same flush start marker again 501 LOG.info("-- Replaying same flush start in secondary again"); 502 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 503 assertNull(result); // this should return null. Ignoring the flush start marker 504 // assert that we still have prepared flush with the previous setup. 505 assertNotNull(secondaryRegion.getPrepareFlushResult()); 506 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 507 startFlushDesc.getFlushSequenceNumber()); 508 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 509 verifyData(secondaryRegion, 0, numRows, cq, families); 510 511 // Test case 2: replay a flush start marker with a smaller seqId 512 FlushDescriptor startFlushDescSmallerSeqId 513 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50); 514 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId); 515 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId); 516 assertNull(result); // this should return null. Ignoring the flush start marker 517 // assert that we still have prepared flush with the previous setup. 518 assertNotNull(secondaryRegion.getPrepareFlushResult()); 519 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 520 startFlushDesc.getFlushSequenceNumber()); 521 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 522 verifyData(secondaryRegion, 0, numRows, cq, families); 523 524 // Test case 3: replay a flush start marker with a larger seqId 525 FlushDescriptor startFlushDescLargerSeqId 526 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50); 527 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId); 528 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId); 529 assertNull(result); // this should return null. Ignoring the flush start marker 530 // assert that we still have prepared flush with the previous setup. 531 assertNotNull(secondaryRegion.getPrepareFlushResult()); 532 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId, 533 startFlushDesc.getFlushSequenceNumber()); 534 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty 535 verifyData(secondaryRegion, 0, numRows, cq, families); 536 537 LOG.info("-- Verifying edits from secondary"); 538 verifyData(secondaryRegion, 0, numRows, cq, families); 539 540 LOG.info("-- Verifying edits from primary."); 541 verifyData(primaryRegion, 0, numRows, cq, families); 542 } 543 544 /** 545 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 546 * less than the previous flush start marker. 547 */ 548 @Test 549 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException { 550 // load some data to primary and flush. 2 flushes and some more unflushed data 551 putDataWithFlushes(primaryRegion, 100, 200, 100); 552 int numRows = 300; 553 554 // now replay the edits and the flush marker 555 reader = createWALReaderForPrimary(); 556 557 LOG.info("-- Replaying edits and flush events in secondary"); 558 FlushDescriptor startFlushDesc = null; 559 FlushDescriptor commitFlushDesc = null; 560 561 int lastReplayed = 0; 562 while (true) { 563 System.out.println(lastReplayed); 564 WAL.Entry entry = reader.next(); 565 if (entry == null) { 566 break; 567 } 568 FlushDescriptor flushDesc 569 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 570 if (flushDesc != null) { 571 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 572 // don't replay the first flush start marker, hold on to it, replay the second one 573 if (startFlushDesc == null) { 574 startFlushDesc = flushDesc; 575 } else { 576 LOG.info("-- Replaying flush start in secondary"); 577 startFlushDesc = flushDesc; 578 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 579 assertNull(result.result); 580 } 581 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 582 // do not replay any flush commit yet 583 if (commitFlushDesc == null) { 584 commitFlushDesc = flushDesc; // hold on to the first flush commit marker 585 } 586 } 587 // after replay verify that everything is still visible 588 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 589 } else { 590 lastReplayed = replayEdit(secondaryRegion, entry); 591 } 592 } 593 594 // at this point, there should be some data (rows 0-200) in memstore snapshot 595 // and some more data in memstores (rows 200-300) 596 verifyData(secondaryRegion, 0, numRows, cq, families); 597 598 // no store files in the region 599 int expectedStoreFileCount = 0; 600 for (HStore s : secondaryRegion.getStores()) { 601 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 602 } 603 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 604 605 // Test case 1: replay the a flush commit marker smaller than what we have prepared 606 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 607 + startFlushDesc); 608 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber()); 609 610 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 611 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 612 613 // assert that the flush files are picked 614 expectedStoreFileCount++; 615 for (HStore s : secondaryRegion.getStores()) { 616 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 617 } 618 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 619 MemStoreSize mss = store.getFlushableSize(); 620 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 621 622 // assert that the region memstore is same as before 623 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 624 assertEquals(regionMemstoreSize, newRegionMemstoreSize); 625 626 assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped 627 628 LOG.info("-- Verifying edits from secondary"); 629 verifyData(secondaryRegion, 0, numRows, cq, families); 630 631 LOG.info("-- Verifying edits from primary."); 632 verifyData(primaryRegion, 0, numRows, cq, families); 633 } 634 635 /** 636 * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker 637 * larger than the previous flush start marker. 638 */ 639 @Test 640 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException { 641 // load some data to primary and flush. 1 flush and some more unflushed data 642 putDataWithFlushes(primaryRegion, 100, 100, 100); 643 int numRows = 200; 644 645 // now replay the edits and the flush marker 646 reader = createWALReaderForPrimary(); 647 648 LOG.info("-- Replaying edits and flush events in secondary"); 649 FlushDescriptor startFlushDesc = null; 650 FlushDescriptor commitFlushDesc = null; 651 652 int lastReplayed = 0; 653 while (true) { 654 WAL.Entry entry = reader.next(); 655 if (entry == null) { 656 break; 657 } 658 FlushDescriptor flushDesc 659 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 660 if (flushDesc != null) { 661 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 662 if (startFlushDesc == null) { 663 LOG.info("-- Replaying flush start in secondary"); 664 startFlushDesc = flushDesc; 665 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc); 666 assertNull(result.result); 667 } 668 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 669 // do not replay any flush commit yet 670 // hold on to the flush commit marker but simulate a larger 671 // flush commit seqId 672 commitFlushDesc = 673 FlushDescriptor.newBuilder(flushDesc) 674 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50) 675 .build(); 676 } 677 // after replay verify that everything is still visible 678 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 679 } else { 680 lastReplayed = replayEdit(secondaryRegion, entry); 681 } 682 } 683 684 // at this point, there should be some data (rows 0-100) in memstore snapshot 685 // and some more data in memstores (rows 100-200) 686 verifyData(secondaryRegion, 0, numRows, cq, families); 687 688 // no store files in the region 689 int expectedStoreFileCount = 0; 690 for (HStore s : secondaryRegion.getStores()) { 691 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 692 } 693 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 694 695 // Test case 1: replay the a flush commit marker larger than what we have prepared 696 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START" 697 + startFlushDesc); 698 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber()); 699 700 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 701 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 702 703 // assert that the flush files are picked 704 expectedStoreFileCount++; 705 for (HStore s : secondaryRegion.getStores()) { 706 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 707 } 708 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 709 MemStoreSize mss = store.getFlushableSize(); 710 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 711 712 // assert that the region memstore is smaller than before, but not empty 713 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 714 assertTrue(newRegionMemstoreSize > 0); 715 assertTrue(regionMemstoreSize > newRegionMemstoreSize); 716 717 assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped 718 719 LOG.info("-- Verifying edits from secondary"); 720 verifyData(secondaryRegion, 0, numRows, cq, families); 721 722 LOG.info("-- Verifying edits from primary."); 723 verifyData(primaryRegion, 0, numRows, cq, families); 724 } 725 726 /** 727 * Tests the case where we receive a flush commit before receiving any flush prepare markers. 728 * The memstore edits should be dropped after the flush commit replay since they should be in 729 * flushed files 730 */ 731 @Test 732 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore() 733 throws IOException { 734 testReplayFlushCommitMarkerWithoutFlushStartMarker(true); 735 } 736 737 /** 738 * Tests the case where we receive a flush commit before receiving any flush prepare markers. 739 * The memstore edits should be not dropped after the flush commit replay since not every edit 740 * will be in flushed files (based on seqId) 741 */ 742 @Test 743 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore() 744 throws IOException { 745 testReplayFlushCommitMarkerWithoutFlushStartMarker(false); 746 } 747 748 /** 749 * Tests the case where we receive a flush commit before receiving any flush prepare markers 750 */ 751 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore) 752 throws IOException { 753 // load some data to primary and flush. 1 flushes and some more unflushed data. 754 // write more data after flush depending on whether droppableSnapshot 755 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100); 756 int numRows = droppableMemstore ? 100 : 200; 757 758 // now replay the edits and the flush marker 759 reader = createWALReaderForPrimary(); 760 761 LOG.info("-- Replaying edits and flush events in secondary"); 762 FlushDescriptor commitFlushDesc = null; 763 764 int lastReplayed = 0; 765 while (true) { 766 WAL.Entry entry = reader.next(); 767 if (entry == null) { 768 break; 769 } 770 FlushDescriptor flushDesc 771 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 772 if (flushDesc != null) { 773 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 774 // do not replay flush start marker 775 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 776 commitFlushDesc = flushDesc; // hold on to the flush commit marker 777 } 778 // after replay verify that everything is still visible 779 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families); 780 } else { 781 lastReplayed = replayEdit(secondaryRegion, entry); 782 } 783 } 784 785 // at this point, there should be some data (rows 0-200) in the memstore without snapshot 786 // and some more data in memstores (rows 100-300) 787 verifyData(secondaryRegion, 0, numRows, cq, families); 788 789 // no store files in the region 790 int expectedStoreFileCount = 0; 791 for (HStore s : secondaryRegion.getStores()) { 792 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 793 } 794 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 795 796 // Test case 1: replay a flush commit marker without start flush marker 797 assertNull(secondaryRegion.getPrepareFlushResult()); 798 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0); 799 800 // ensure all files are visible in secondary 801 for (HStore store : secondaryRegion.getStores()) { 802 assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null)); 803 } 804 805 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc); 806 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc); 807 808 // assert that the flush files are picked 809 expectedStoreFileCount++; 810 for (HStore s : secondaryRegion.getStores()) { 811 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 812 } 813 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 814 MemStoreSize mss = store.getFlushableSize(); 815 if (droppableMemstore) { 816 // assert that the memstore is dropped 817 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 818 } else { 819 assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped 820 } 821 822 // assert that the region memstore is same as before (we could not drop) 823 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 824 if (droppableMemstore) { 825 assertTrue(0 == newRegionMemstoreSize); 826 } else { 827 assertTrue(regionMemstoreSize == newRegionMemstoreSize); 828 } 829 830 LOG.info("-- Verifying edits from secondary"); 831 verifyData(secondaryRegion, 0, numRows, cq, families); 832 833 LOG.info("-- Verifying edits from primary."); 834 verifyData(primaryRegion, 0, numRows, cq, families); 835 } 836 837 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) { 838 return FlushDescriptor.newBuilder(flush) 839 .setFlushSequenceNumber(flushSeqId) 840 .build(); 841 } 842 843 /** 844 * Tests replaying region open markers from primary region. Checks whether the files are picked up 845 */ 846 @Test 847 public void testReplayRegionOpenEvent() throws IOException { 848 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 849 int numRows = 100; 850 851 // close the region and open again. 852 primaryRegion.close(); 853 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 854 855 // now replay the edits and the flush marker 856 reader = createWALReaderForPrimary(); 857 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 858 859 LOG.info("-- Replaying edits and region events in secondary"); 860 while (true) { 861 WAL.Entry entry = reader.next(); 862 if (entry == null) { 863 break; 864 } 865 FlushDescriptor flushDesc 866 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 867 RegionEventDescriptor regionEventDesc 868 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 869 870 if (flushDesc != null) { 871 // don't replay flush events 872 } else if (regionEventDesc != null) { 873 regionEvents.add(regionEventDesc); 874 } else { 875 // don't replay edits 876 } 877 } 878 879 // we should have 1 open, 1 close and 1 open event 880 assertEquals(3, regionEvents.size()); 881 882 // replay the first region open event. 883 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0)); 884 885 // replay the close event as well 886 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1)); 887 888 // no store files in the region 889 int expectedStoreFileCount = 0; 890 for (HStore s : secondaryRegion.getStores()) { 891 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 892 } 893 long regionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 894 assertTrue(regionMemstoreSize == 0); 895 896 // now replay the region open event that should contain new file locations 897 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 898 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 899 900 // assert that the flush files are picked 901 expectedStoreFileCount++; 902 for (HStore s : secondaryRegion.getStores()) { 903 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 904 } 905 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 906 MemStoreSize mss = store.getFlushableSize(); 907 assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD); 908 909 // assert that the region memstore is empty 910 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 911 assertTrue(newRegionMemstoreSize == 0); 912 913 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any 914 915 LOG.info("-- Verifying edits from secondary"); 916 verifyData(secondaryRegion, 0, numRows, cq, families); 917 918 LOG.info("-- Verifying edits from primary."); 919 verifyData(primaryRegion, 0, numRows, cq, families); 920 } 921 922 /** 923 * Tests the case where we replay a region open event after a flush start but before receiving 924 * flush commit 925 */ 926 @Test 927 public void testReplayRegionOpenEventAfterFlushStart() throws IOException { 928 putDataWithFlushes(primaryRegion, 100, 100, 100); 929 int numRows = 200; 930 931 // close the region and open again. 932 primaryRegion.close(); 933 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 934 935 // now replay the edits and the flush marker 936 reader = createWALReaderForPrimary(); 937 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 938 939 LOG.info("-- Replaying edits and region events in secondary"); 940 while (true) { 941 WAL.Entry entry = reader.next(); 942 if (entry == null) { 943 break; 944 } 945 FlushDescriptor flushDesc 946 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 947 RegionEventDescriptor regionEventDesc 948 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 949 950 if (flushDesc != null) { 951 // only replay flush start 952 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 953 secondaryRegion.replayWALFlushStartMarker(flushDesc); 954 } 955 } else if (regionEventDesc != null) { 956 regionEvents.add(regionEventDesc); 957 } else { 958 replayEdit(secondaryRegion, entry); 959 } 960 } 961 962 // at this point, there should be some data (rows 0-100) in the memstore snapshot 963 // and some more data in memstores (rows 100-200) 964 verifyData(secondaryRegion, 0, numRows, cq, families); 965 966 // we should have 1 open, 1 close and 1 open event 967 assertEquals(3, regionEvents.size()); 968 969 // no store files in the region 970 int expectedStoreFileCount = 0; 971 for (HStore s : secondaryRegion.getStores()) { 972 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 973 } 974 975 // now replay the region open event that should contain new file locations 976 LOG.info("Testing replaying region open event " + regionEvents.get(2)); 977 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2)); 978 979 // assert that the flush files are picked 980 expectedStoreFileCount = 2; // two flushes happened 981 for (HStore s : secondaryRegion.getStores()) { 982 assertEquals(expectedStoreFileCount, s.getStorefilesCount()); 983 } 984 HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1")); 985 MemStoreSize newSnapshotSize = store.getSnapshotSize(); 986 assertTrue(newSnapshotSize.getDataSize() == 0); 987 988 // assert that the region memstore is empty 989 long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize(); 990 assertTrue(newRegionMemstoreSize == 0); 991 992 assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any 993 994 LOG.info("-- Verifying edits from secondary"); 995 verifyData(secondaryRegion, 0, numRows, cq, families); 996 997 LOG.info("-- Verifying edits from primary."); 998 verifyData(primaryRegion, 0, numRows, cq, families); 999 } 1000 1001 /** 1002 * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId 1003 * of the last replayed region open event. 1004 */ 1005 @Test 1006 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException { 1007 putDataWithFlushes(primaryRegion, 100, 100, 0); 1008 int numRows = 100; 1009 1010 // close the region and open again. 1011 primaryRegion.close(); 1012 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1013 1014 // now replay the edits and the flush marker 1015 reader = createWALReaderForPrimary(); 1016 List<RegionEventDescriptor> regionEvents = Lists.newArrayList(); 1017 List<WAL.Entry> edits = Lists.newArrayList(); 1018 1019 LOG.info("-- Replaying edits and region events in secondary"); 1020 while (true) { 1021 WAL.Entry entry = reader.next(); 1022 if (entry == null) { 1023 break; 1024 } 1025 FlushDescriptor flushDesc 1026 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1027 RegionEventDescriptor regionEventDesc 1028 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1029 1030 if (flushDesc != null) { 1031 // don't replay flushes 1032 } else if (regionEventDesc != null) { 1033 regionEvents.add(regionEventDesc); 1034 } else { 1035 edits.add(entry); 1036 } 1037 } 1038 1039 // replay the region open of first open, but with the seqid of the second open 1040 // this way non of the flush files will be picked up. 1041 secondaryRegion.replayWALRegionEventMarker( 1042 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber( 1043 regionEvents.get(2).getLogSequenceNumber()).build()); 1044 1045 1046 // replay edits from the before region close. If replay does not 1047 // skip these the following verification will NOT fail. 1048 for (WAL.Entry entry: edits) { 1049 replayEdit(secondaryRegion, entry); 1050 } 1051 1052 boolean expectedFail = false; 1053 try { 1054 verifyData(secondaryRegion, 0, numRows, cq, families); 1055 } catch (AssertionError e) { 1056 expectedFail = true; // expected 1057 } 1058 if (!expectedFail) { 1059 fail("Should have failed this verification"); 1060 } 1061 } 1062 1063 @Test 1064 public void testReplayFlushSeqIds() throws IOException { 1065 // load some data to primary and flush 1066 int start = 0; 1067 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100)); 1068 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families); 1069 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1070 primaryRegion.flush(true); 1071 1072 // now replay the flush marker 1073 reader = createWALReaderForPrimary(); 1074 1075 long flushSeqId = -1; 1076 LOG.info("-- Replaying flush events in secondary"); 1077 while (true) { 1078 WAL.Entry entry = reader.next(); 1079 if (entry == null) { 1080 break; 1081 } 1082 FlushDescriptor flushDesc 1083 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1084 if (flushDesc != null) { 1085 if (flushDesc.getAction() == FlushAction.START_FLUSH) { 1086 LOG.info("-- Replaying flush start in secondary"); 1087 secondaryRegion.replayWALFlushStartMarker(flushDesc); 1088 flushSeqId = flushDesc.getFlushSequenceNumber(); 1089 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { 1090 LOG.info("-- Replaying flush commit in secondary"); 1091 secondaryRegion.replayWALFlushCommitMarker(flushDesc); 1092 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber()); 1093 } 1094 } 1095 // else do not replay 1096 } 1097 1098 // TODO: what to do with this? 1099 // assert that the newly picked up flush file is visible 1100 long readPoint = secondaryRegion.getMVCC().getReadPoint(); 1101 assertEquals(flushSeqId, readPoint); 1102 1103 // after replay verify that everything is still visible 1104 verifyData(secondaryRegion, 0, 100, cq, families); 1105 } 1106 1107 @Test 1108 public void testSeqIdsFromReplay() throws IOException { 1109 // test the case where seqId's coming from replayed WALEdits are made persisted with their 1110 // original seqIds and they are made visible through mvcc read point upon replay 1111 String method = name.getMethodName(); 1112 byte[] tableName = Bytes.toBytes(method); 1113 byte[] family = Bytes.toBytes("family"); 1114 1115 HRegion region = initHRegion(tableName, method, family); 1116 try { 1117 // replay an entry that is bigger than current read point 1118 long readPoint = region.getMVCC().getReadPoint(); 1119 long origSeqId = readPoint + 100; 1120 1121 Put put = new Put(row).addColumn(family, row, row); 1122 put.setDurability(Durability.SKIP_WAL); // we replay with skip wal 1123 replay(region, put, origSeqId); 1124 1125 // read point should have advanced to this seqId 1126 assertGet(region, family, row); 1127 1128 // region seqId should have advanced at least to this seqId 1129 assertEquals(origSeqId, region.getReadPoint(null)); 1130 1131 // replay an entry that is smaller than current read point 1132 // caution: adding an entry below current read point might cause partial dirty reads. Normal 1133 // replay does not allow reads while replay is going on. 1134 put = new Put(row2).addColumn(family, row2, row2); 1135 put.setDurability(Durability.SKIP_WAL); 1136 replay(region, put, origSeqId - 50); 1137 1138 assertGet(region, family, row2); 1139 } finally { 1140 region.close(); 1141 } 1142 } 1143 1144 /** 1145 * Tests that a region opened in secondary mode would not write region open / close 1146 * events to its WAL. 1147 * @throws IOException 1148 */ 1149 @Test 1150 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException { 1151 secondaryRegion.close(); 1152 walSecondary = spy(walSecondary); 1153 1154 // test for region open and close 1155 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); 1156 verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), 1157 any(WALEdit.class), anyBoolean()); 1158 1159 // test for replay prepare flush 1160 putDataByReplay(secondaryRegion, 0, 10, cq, families); 1161 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder(). 1162 setFlushSequenceNumber(10) 1163 .setTableName(UnsafeByteOperations.unsafeWrap( 1164 primaryRegion.getTableDescriptor().getTableName().getName())) 1165 .setAction(FlushAction.START_FLUSH) 1166 .setEncodedRegionName( 1167 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1168 .setRegionName(UnsafeByteOperations.unsafeWrap( 1169 primaryRegion.getRegionInfo().getRegionName())) 1170 .build()); 1171 1172 verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), 1173 any(WALEdit.class), anyBoolean()); 1174 1175 secondaryRegion.close(); 1176 verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class), 1177 any(WALEdit.class), anyBoolean()); 1178 } 1179 1180 /** 1181 * Tests the reads enabled flag for the region. When unset all reads should be rejected 1182 */ 1183 @Test 1184 public void testRegionReadsEnabledFlag() throws IOException { 1185 1186 putDataByReplay(secondaryRegion, 0, 100, cq, families); 1187 1188 verifyData(secondaryRegion, 0, 100, cq, families); 1189 1190 // now disable reads 1191 secondaryRegion.setReadsEnabled(false); 1192 try { 1193 verifyData(secondaryRegion, 0, 100, cq, families); 1194 fail("Should have failed with IOException"); 1195 } catch(IOException ex) { 1196 // expected 1197 } 1198 1199 // verify that we can still replay data 1200 putDataByReplay(secondaryRegion, 100, 100, cq, families); 1201 1202 // now enable reads again 1203 secondaryRegion.setReadsEnabled(true); 1204 verifyData(secondaryRegion, 0, 200, cq, families); 1205 } 1206 1207 /** 1208 * Tests the case where a request for flush cache is sent to the region, but region cannot flush. 1209 * It should write the flush request marker instead. 1210 */ 1211 @Test 1212 public void testWriteFlushRequestMarker() throws IOException { 1213 // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false 1214 FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY); 1215 assertNotNull(result); 1216 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1217 assertFalse(result.wroteFlushWalMarker); 1218 1219 // request flush again, but this time with writeFlushRequestWalMarker = true 1220 result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1221 assertNotNull(result); 1222 assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result); 1223 assertTrue(result.wroteFlushWalMarker); 1224 1225 List<FlushDescriptor> flushes = Lists.newArrayList(); 1226 reader = createWALReaderForPrimary(); 1227 while (true) { 1228 WAL.Entry entry = reader.next(); 1229 if (entry == null) { 1230 break; 1231 } 1232 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1233 if (flush != null) { 1234 flushes.add(flush); 1235 } 1236 } 1237 1238 assertEquals(1, flushes.size()); 1239 assertNotNull(flushes.get(0)); 1240 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction()); 1241 } 1242 1243 /** 1244 * Test the case where the secondary region replica is not in reads enabled state because it is 1245 * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH 1246 * flush marker entry should restore the reads enabled status in the region and allow the reads 1247 * to continue. 1248 */ 1249 @Test 1250 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException { 1251 disableReads(secondaryRegion); 1252 1253 // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from 1254 // triggered flush restores readsEnabled 1255 primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 1256 reader = createWALReaderForPrimary(); 1257 while (true) { 1258 WAL.Entry entry = reader.next(); 1259 if (entry == null) { 1260 break; 1261 } 1262 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1263 if (flush != null) { 1264 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1265 } 1266 } 1267 1268 // now reads should be enabled 1269 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1270 } 1271 1272 /** 1273 * Test the case where the secondary region replica is not in reads enabled state because it is 1274 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1275 * entries should restore the reads enabled status in the region and allow the reads 1276 * to continue. 1277 */ 1278 @Test 1279 public void testReplayingFlushRestoresReadsEnabledState() throws IOException { 1280 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1281 // from triggered flush restores readsEnabled 1282 disableReads(secondaryRegion); 1283 1284 // put some data in primary 1285 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1286 primaryRegion.flush(true); 1287 // I seem to need to push more edits through so the WAL flushes on local fs. This was not 1288 // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I 1289 // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content.. 1290 // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up 1291 // but can't figure it... and this is only test that seems to suffer this flush issue. 1292 // St.Ack 20160201 1293 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1294 1295 reader = createWALReaderForPrimary(); 1296 while (true) { 1297 WAL.Entry entry = reader.next(); 1298 LOG.info(Objects.toString(entry)); 1299 if (entry == null) { 1300 break; 1301 } 1302 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1303 if (flush != null) { 1304 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1305 } else { 1306 replayEdit(secondaryRegion, entry); 1307 } 1308 } 1309 1310 // now reads should be enabled 1311 verifyData(secondaryRegion, 0, 100, cq, families); 1312 } 1313 1314 /** 1315 * Test the case where the secondary region replica is not in reads enabled state because it is 1316 * waiting for a flush or region open marker from primary region. Replaying flush start and commit 1317 * entries should restore the reads enabled status in the region and allow the reads 1318 * to continue. 1319 */ 1320 @Test 1321 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException { 1322 // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came 1323 // from triggered flush restores readsEnabled 1324 disableReads(secondaryRegion); 1325 1326 // put some data in primary 1327 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families); 1328 primaryRegion.flush(true); 1329 1330 reader = createWALReaderForPrimary(); 1331 while (true) { 1332 WAL.Entry entry = reader.next(); 1333 if (entry == null) { 1334 break; 1335 } 1336 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1337 if (flush != null) { 1338 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); 1339 } 1340 } 1341 1342 // now reads should be enabled 1343 verifyData(secondaryRegion, 0, 100, cq, families); 1344 } 1345 1346 /** 1347 * Test the case where the secondary region replica is not in reads enabled state because it is 1348 * waiting for a flush or region open marker from primary region. Replaying region open event 1349 * entry from primary should restore the reads enabled status in the region and allow the reads 1350 * to continue. 1351 */ 1352 @Test 1353 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException { 1354 // Test case 3: Test that replaying region open event markers restores readsEnabled 1355 disableReads(secondaryRegion); 1356 1357 primaryRegion.close(); 1358 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1359 1360 reader = createWALReaderForPrimary(); 1361 while (true) { 1362 WAL.Entry entry = reader.next(); 1363 if (entry == null) { 1364 break; 1365 } 1366 1367 RegionEventDescriptor regionEventDesc 1368 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0)); 1369 1370 if (regionEventDesc != null) { 1371 secondaryRegion.replayWALRegionEventMarker(regionEventDesc); 1372 } 1373 } 1374 1375 // now reads should be enabled 1376 secondaryRegion.get(new Get(Bytes.toBytes(0))); 1377 } 1378 1379 @Test 1380 public void testRefresStoreFiles() throws IOException { 1381 assertEquals(0, primaryRegion.getStoreFileList(families).size()); 1382 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1383 1384 // Test case 1: refresh with an empty region 1385 secondaryRegion.refreshStoreFiles(); 1386 assertEquals(0, secondaryRegion.getStoreFileList(families).size()); 1387 1388 // do one flush 1389 putDataWithFlushes(primaryRegion, 100, 100, 0); 1390 int numRows = 100; 1391 1392 // refresh the store file list, and ensure that the files are picked up. 1393 secondaryRegion.refreshStoreFiles(); 1394 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1395 secondaryRegion.getStoreFileList(families)); 1396 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1397 1398 LOG.info("-- Verifying edits from secondary"); 1399 verifyData(secondaryRegion, 0, numRows, cq, families); 1400 1401 // Test case 2: 3 some more flushes 1402 putDataWithFlushes(primaryRegion, 100, 300, 0); 1403 numRows = 300; 1404 1405 // refresh the store file list, and ensure that the files are picked up. 1406 secondaryRegion.refreshStoreFiles(); 1407 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1408 secondaryRegion.getStoreFileList(families)); 1409 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); 1410 1411 LOG.info("-- Verifying edits from secondary"); 1412 verifyData(secondaryRegion, 0, numRows, cq, families); 1413 1414 if (FSUtils.WINDOWS) { 1415 // compaction cannot move files while they are open in secondary on windows. Skip remaining. 1416 return; 1417 } 1418 1419 // Test case 3: compact primary files 1420 primaryRegion.compactStores(); 1421 List<HRegion> regions = new ArrayList<>(); 1422 regions.add(primaryRegion); 1423 Mockito.doReturn(regions).when(rss).getRegions(); 1424 CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false); 1425 cleaner.chore(); 1426 secondaryRegion.refreshStoreFiles(); 1427 assertPathListsEqual(primaryRegion.getStoreFileList(families), 1428 secondaryRegion.getStoreFileList(families)); 1429 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); 1430 1431 LOG.info("-- Verifying edits from secondary"); 1432 verifyData(secondaryRegion, 0, numRows, cq, families); 1433 1434 LOG.info("-- Replaying edits in secondary"); 1435 1436 // Test case 4: replay some edits, ensure that memstore is dropped. 1437 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1438 putDataWithFlushes(primaryRegion, 400, 400, 0); 1439 numRows = 400; 1440 1441 reader = createWALReaderForPrimary(); 1442 while (true) { 1443 WAL.Entry entry = reader.next(); 1444 if (entry == null) { 1445 break; 1446 } 1447 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); 1448 if (flush != null) { 1449 // do not replay flush 1450 } else { 1451 replayEdit(secondaryRegion, entry); 1452 } 1453 } 1454 1455 assertTrue(secondaryRegion.getMemStoreDataSize() > 0); 1456 1457 secondaryRegion.refreshStoreFiles(); 1458 1459 assertTrue(secondaryRegion.getMemStoreDataSize() == 0); 1460 1461 LOG.info("-- Verifying edits from primary"); 1462 verifyData(primaryRegion, 0, numRows, cq, families); 1463 LOG.info("-- Verifying edits from secondary"); 1464 verifyData(secondaryRegion, 0, numRows, cq, families); 1465 } 1466 1467 /** 1468 * Paths can be qualified or not. This does the assertion using String->Path conversion. 1469 */ 1470 private void assertPathListsEqual(List<String> list1, List<String> list2) { 1471 List<Path> l1 = new ArrayList<>(list1.size()); 1472 for (String path : list1) { 1473 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1474 } 1475 List<Path> l2 = new ArrayList<>(list2.size()); 1476 for (String path : list2) { 1477 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); 1478 } 1479 assertEquals(l1, l2); 1480 } 1481 1482 private void disableReads(HRegion region) { 1483 region.setReadsEnabled(false); 1484 try { 1485 verifyData(region, 0, 1, cq, families); 1486 fail("Should have failed with IOException"); 1487 } catch(IOException ex) { 1488 // expected 1489 } 1490 } 1491 1492 private void replay(HRegion region, Put put, long replaySeqId) throws IOException { 1493 put.setDurability(Durability.SKIP_WAL); 1494 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); 1495 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); 1496 } 1497 1498 /** 1499 * Tests replaying region open markers from primary region. Checks whether the files are picked up 1500 */ 1501 @Test 1502 public void testReplayBulkLoadEvent() throws IOException { 1503 LOG.info("testReplayBulkLoadEvent starts"); 1504 putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush 1505 1506 // close the region and open again. 1507 primaryRegion.close(); 1508 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); 1509 1510 // bulk load a file into primary region 1511 Random random = new Random(); 1512 byte[] randomValues = new byte[20]; 1513 random.nextBytes(randomValues); 1514 Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); 1515 1516 List<Pair<byte[], String>> familyPaths = new ArrayList<>(); 1517 int expectedLoadFileCount = 0; 1518 for (byte[] family : families) { 1519 familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues))); 1520 expectedLoadFileCount++; 1521 } 1522 primaryRegion.bulkLoadHFiles(familyPaths, false, null); 1523 1524 // now replay the edits and the bulk load marker 1525 reader = createWALReaderForPrimary(); 1526 1527 LOG.info("-- Replaying edits and region events in secondary"); 1528 BulkLoadDescriptor bulkloadEvent = null; 1529 while (true) { 1530 WAL.Entry entry = reader.next(); 1531 if (entry == null) { 1532 break; 1533 } 1534 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); 1535 if (bulkloadEvent != null) { 1536 break; 1537 } 1538 } 1539 1540 // we should have 1 bulk load event 1541 assertTrue(bulkloadEvent != null); 1542 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); 1543 1544 // replay the bulk load event 1545 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); 1546 1547 1548 List<String> storeFileName = new ArrayList<>(); 1549 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { 1550 storeFileName.addAll(storeDesc.getStoreFileList()); 1551 } 1552 // assert that the bulk loaded files are picked 1553 for (HStore s : secondaryRegion.getStores()) { 1554 for (HStoreFile sf : s.getStorefiles()) { 1555 storeFileName.remove(sf.getPath().getName()); 1556 } 1557 } 1558 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); 1559 1560 LOG.info("-- Verifying edits from secondary"); 1561 for (byte[] family : families) { 1562 assertGet(secondaryRegion, family, randomValues); 1563 } 1564 } 1565 1566 @Test 1567 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException { 1568 // tests replaying flush commit marker, but the flush file has already been compacted 1569 // from primary and also deleted from the archive directory 1570 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder(). 1571 setFlushSequenceNumber(Long.MAX_VALUE) 1572 .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName())) 1573 .setAction(FlushAction.COMMIT_FLUSH) 1574 .setEncodedRegionName( 1575 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1576 .setRegionName(UnsafeByteOperations.unsafeWrap( 1577 primaryRegion.getRegionInfo().getRegionName())) 1578 .addStoreFlushes(StoreFlushDescriptor.newBuilder() 1579 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1580 .setStoreHomeDir("/store_home_dir") 1581 .addFlushOutput("/foo/baz/bar") 1582 .build()) 1583 .build()); 1584 } 1585 1586 @Test 1587 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException { 1588 // tests replaying compaction marker, but the compaction output file has already been compacted 1589 // from primary and also deleted from the archive directory 1590 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder() 1591 .setTableName(UnsafeByteOperations.unsafeWrap( 1592 primaryRegion.getTableDescriptor().getTableName().getName())) 1593 .setEncodedRegionName( 1594 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1595 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1596 .addCompactionInput("/foo") 1597 .addCompactionOutput("/bar") 1598 .setStoreHomeDir("/store_home_dir") 1599 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1600 .build() 1601 , true, true, Long.MAX_VALUE); 1602 } 1603 1604 @Test 1605 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException { 1606 // tests replaying region open event marker, but the region files have already been compacted 1607 // from primary and also deleted from the archive directory 1608 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder() 1609 .setTableName(UnsafeByteOperations.unsafeWrap( 1610 primaryRegion.getTableDescriptor().getTableName().getName())) 1611 .setEncodedRegionName( 1612 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1613 .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName())) 1614 .setEventType(EventType.REGION_OPEN) 1615 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1))) 1616 .setLogSequenceNumber(Long.MAX_VALUE) 1617 .addStores(StoreDescriptor.newBuilder() 1618 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1619 .setStoreHomeDir("/store_home_dir") 1620 .addStoreFile("/foo") 1621 .build()) 1622 .build()); 1623 } 1624 1625 @Test 1626 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException { 1627 // tests replaying bulk load event marker, but the bulk load files have already been compacted 1628 // from primary and also deleted from the archive directory 1629 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder() 1630 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName())) 1631 .setEncodedRegionName( 1632 UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes())) 1633 .setBulkloadSeqNum(Long.MAX_VALUE) 1634 .addStores(StoreDescriptor.newBuilder() 1635 .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])) 1636 .setStoreHomeDir("/store_home_dir") 1637 .addStoreFile("/foo") 1638 .build()) 1639 .build()); 1640 } 1641 1642 private String createHFileForFamilies(Path testPath, byte[] family, 1643 byte[] valueBytes) throws IOException { 1644 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); 1645 // TODO We need a way to do this without creating files 1646 Path testFile = new Path(testPath, UUID.randomUUID().toString()); 1647 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile); 1648 try { 1649 hFileFactory.withOutputStream(out); 1650 hFileFactory.withFileContext(new HFileContext()); 1651 HFile.Writer writer = hFileFactory.create(); 1652 try { 1653 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L, 1654 KeyValue.Type.Put.getCode(), valueBytes))); 1655 } finally { 1656 writer.close(); 1657 } 1658 } finally { 1659 out.close(); 1660 } 1661 return testFile.toString(); 1662 } 1663 1664 /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does 1665 * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of 1666 * more rows but does not execute flush after 1667 * @throws IOException */ 1668 private void putDataWithFlushes(HRegion region, int flushInterval, 1669 int numRows, int numRowsAfterFlush) throws IOException { 1670 int start = 0; 1671 for (; start < numRows; start += flushInterval) { 1672 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval)); 1673 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families); 1674 LOG.info("-- Flushing primary, creating 3 files for 3 stores"); 1675 region.flush(true); 1676 } 1677 LOG.info("-- Writing some more data to primary, not flushing"); 1678 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families); 1679 } 1680 1681 private void putDataByReplay(HRegion region, 1682 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { 1683 for (int i = startRow; i < startRow + numRows; i++) { 1684 Put put = new Put(Bytes.toBytes("" + i)); 1685 put.setDurability(Durability.SKIP_WAL); 1686 for (byte[] family : families) { 1687 put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null); 1688 } 1689 replay(region, put, i+1); 1690 } 1691 } 1692 1693 private static HRegion initHRegion(byte[] tableName, 1694 String callingMethod, byte[]... families) throws IOException { 1695 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, 1696 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families); 1697 } 1698 1699 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, 1700 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, 1701 WAL wal, byte[]... families) throws IOException { 1702 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, 1703 isReadOnly, durability, wal, families); 1704 } 1705}