001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.FileNotFoundException;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.Map;
041import java.util.Objects;
042import java.util.Random;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataOutputStream;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtility;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.Durability;
056import org.apache.hadoop.hbase.client.Get;
057import org.apache.hadoop.hbase.client.Put;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.RegionInfoBuilder;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.executor.ExecutorService;
063import org.apache.hadoop.hbase.io.hfile.HFile;
064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
065import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
066import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
067import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
072import org.apache.hadoop.hbase.util.FSUtils;
073import org.apache.hadoop.hbase.util.Pair;
074import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
075import org.apache.hadoop.hbase.wal.WAL;
076import org.apache.hadoop.hbase.wal.WALEdit;
077import org.apache.hadoop.hbase.wal.WALFactory;
078import org.apache.hadoop.hbase.wal.WALKeyImpl;
079import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
080import org.apache.hadoop.util.StringUtils;
081import org.junit.After;
082import org.junit.AfterClass;
083import org.junit.Before;
084import org.junit.BeforeClass;
085import org.junit.ClassRule;
086import org.junit.Rule;
087import org.junit.Test;
088import org.junit.experimental.categories.Category;
089import org.junit.rules.TestName;
090import org.mockito.Mockito;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
095import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
107
108/**
109 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
110 * region replicas
111 */
112@Category(LargeTests.class)
113public class TestHRegionReplayEvents {
114
115  @ClassRule
116  public static final HBaseClassTestRule CLASS_RULE =
117      HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
118
119  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
120  @Rule public TestName name = new TestName();
121
122  private static HBaseTestingUtility TEST_UTIL;
123
124  public static Configuration CONF;
125  private String dir;
126
127  private byte[][] families = new byte[][] {
128      Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
129
130  // Test names
131  protected byte[] tableName;
132  protected String method;
133  protected final byte[] row = Bytes.toBytes("rowA");
134  protected final byte[] row2 = Bytes.toBytes("rowB");
135  protected byte[] cq = Bytes.toBytes("cq");
136
137  // per test fields
138  private Path rootDir;
139  private TableDescriptor htd;
140  private RegionServerServices rss;
141  private RegionInfo primaryHri, secondaryHri;
142  private HRegion primaryRegion, secondaryRegion;
143  private WAL walPrimary, walSecondary;
144  private WAL.Reader reader;
145
146  @BeforeClass
147  public static void setUpBeforeClass() throws Exception {
148    TEST_UTIL = new HBaseTestingUtility();
149    TEST_UTIL.startMiniDFSCluster(1);
150  }
151
152  @AfterClass
153  public static void tearDownAfterClass() throws Exception {
154    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
155    TEST_UTIL.cleanupTestDir();
156    TEST_UTIL.shutdownMiniDFSCluster();
157  }
158
159  @Before
160  public void setUp() throws Exception {
161    CONF = TEST_UTIL.getConfiguration();
162    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
163    method = name.getMethodName();
164    tableName = Bytes.toBytes(name.getMethodName());
165    rootDir = new Path(dir + method);
166    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
167    method = name.getMethodName();
168    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
169    for (byte[] family : families) {
170      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
171    }
172    htd = builder.build();
173
174    long time = System.currentTimeMillis();
175    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
176      0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
177    primaryHri =
178        RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
179    secondaryHri =
180        RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
181
182    WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
183    walPrimary = wals.getWAL(primaryHri);
184    walSecondary = wals.getWAL(secondaryHri);
185
186    rss = mock(RegionServerServices.class);
187    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
188    when(rss.getConfiguration()).thenReturn(CONF);
189    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
190    String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
191        .toString();
192    ExecutorService es = new ExecutorService(string);
193    es.startExecutorService(
194      string+"-"+string, 1);
195    when(rss.getExecutorService()).thenReturn(es);
196    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
197    primaryRegion.close();
198    List<HRegion> regions = new ArrayList<>();
199    regions.add(primaryRegion);
200    Mockito.doReturn(regions).when(rss).getRegions();
201
202    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
203    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
204
205    reader = null;
206  }
207
208  @After
209  public void tearDown() throws Exception {
210    if (reader != null) {
211      reader.close();
212    }
213
214    if (primaryRegion != null) {
215      HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
216    }
217    if (secondaryRegion != null) {
218      HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
219    }
220
221    EnvironmentEdgeManagerTestHelper.reset();
222  }
223
224  String getName() {
225    return name.getMethodName();
226  }
227
228  // Some of the test cases are as follows:
229  // 1. replay flush start marker again
230  // 2. replay flush with smaller seqId than what is there in memstore snapshot
231  // 3. replay flush with larger seqId than what is there in memstore snapshot
232  // 4. replay flush commit without flush prepare. non droppable memstore
233  // 5. replay flush commit without flush prepare. droppable memstore
234  // 6. replay open region event
235  // 7. replay open region event after flush start
236  // 8. replay flush form an earlier seqId (test ignoring seqIds)
237  // 9. start flush does not prevent region from closing.
238
239  @Test
240  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
241    // load some data and flush ensure that the secondary replica will not execute the flush
242
243    // load some data to secondary by replaying
244    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
245
246    verifyData(secondaryRegion, 0, 1000, cq, families);
247
248    // flush region
249    FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
250    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
251
252    verifyData(secondaryRegion, 0, 1000, cq, families);
253
254    // close the region, and inspect that it has not flushed
255    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
256    // assert that there are no files (due to flush)
257    for (List<HStoreFile> f : files.values()) {
258      assertTrue(f.isEmpty());
259    }
260  }
261
262  /**
263   * Tests a case where we replay only a flush start marker, then the region is closed. This region
264   * should not block indefinitely
265   */
266  @Test
267  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
268    // load some data to primary and flush
269    int start = 0;
270    LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
271    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
272    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
273    primaryRegion.flush(true);
274
275    // now replay the edits and the flush marker
276    reader = createWALReaderForPrimary();
277
278    LOG.info("-- Replaying edits and flush events in secondary");
279    while (true) {
280      WAL.Entry entry = reader.next();
281      if (entry == null) {
282        break;
283      }
284      FlushDescriptor flushDesc
285        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
286      if (flushDesc != null) {
287        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
288          LOG.info("-- Replaying flush start in secondary");
289          secondaryRegion.replayWALFlushStartMarker(flushDesc);
290        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
291          LOG.info("-- NOT Replaying flush commit in secondary");
292        }
293      } else {
294        replayEdit(secondaryRegion, entry);
295      }
296    }
297
298    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
299    // now close the region which should not cause hold because of un-committed flush
300    secondaryRegion.close();
301
302    // verify that the memstore size is back to what it was
303    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
304  }
305
306  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
307    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
308      return 0; // handled elsewhere
309    }
310    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
311    for (Cell cell : entry.getEdit().getCells()) put.add(cell);
312    put.setDurability(Durability.SKIP_WAL);
313    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
314    region.batchReplay(new MutationReplay[] {mutation},
315      entry.getKey().getSequenceId());
316    return Integer.parseInt(Bytes.toString(put.getRow()));
317  }
318
319  WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
320    return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
321      AbstractFSWALProvider.getCurrentFileName(walPrimary),
322      TEST_UTIL.getConfiguration());
323  }
324
325  @Test
326  public void testBatchReplayWithMultipleNonces() throws IOException {
327    try {
328      MutationReplay[] mutations = new MutationReplay[100];
329      for (int i = 0; i < 100; i++) {
330        Put put = new Put(Bytes.toBytes(i));
331        put.setDurability(Durability.SYNC_WAL);
332        for (byte[] familly : this.families) {
333          put.addColumn(familly, this.cq, null);
334          long nonceNum = i / 10;
335          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
336        }
337      }
338      primaryRegion.batchReplay(mutations, 20);
339    } catch (Exception e) {
340      String msg = "Error while replay of batch with multiple nonces. ";
341      LOG.error(msg, e);
342      fail(msg + e.getMessage());
343    }
344  }
345
346  @Test
347  public void testReplayFlushesAndCompactions() throws IOException {
348    // initiate a secondary region with some data.
349
350    // load some data to primary and flush. 3 flushes and some more unflushed data
351    putDataWithFlushes(primaryRegion, 100, 300, 100);
352
353    // compaction from primary
354    LOG.info("-- Compacting primary, only 1 store");
355    primaryRegion.compactStore(Bytes.toBytes("cf1"),
356      NoLimitThroughputController.INSTANCE);
357
358    // now replay the edits and the flush marker
359    reader = createWALReaderForPrimary();
360
361    LOG.info("-- Replaying edits and flush events in secondary");
362    int lastReplayed = 0;
363    int expectedStoreFileCount = 0;
364    while (true) {
365      WAL.Entry entry = reader.next();
366      if (entry == null) {
367        break;
368      }
369      FlushDescriptor flushDesc
370      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
371      CompactionDescriptor compactionDesc
372      = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
373      if (flushDesc != null) {
374        // first verify that everything is replayed and visible before flush event replay
375        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
376        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
377        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
378        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
379        MemStoreSize mss = store.getFlushableSize();
380        long storeSize = store.getSize();
381        long storeSizeUncompressed = store.getStoreSizeUncompressed();
382        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
383          LOG.info("-- Replaying flush start in secondary");
384          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
385          assertNull(result.result);
386          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
387
388          // assert that the store memstore is smaller now
389          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
390          LOG.info("Memstore size reduced by:"
391              + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
392          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
393
394        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
395          LOG.info("-- Replaying flush commit in secondary");
396          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
397
398          // assert that the flush files are picked
399          expectedStoreFileCount++;
400          for (HStore s : secondaryRegion.getStores()) {
401            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
402          }
403          MemStoreSize newMss = store.getFlushableSize();
404          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
405
406          // assert that the region memstore is smaller now
407          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
408          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
409
410          // assert that the store sizes are bigger
411          assertTrue(store.getSize() > storeSize);
412          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
413          assertEquals(store.getSize(), store.getStorefilesSize());
414        }
415        // after replay verify that everything is still visible
416        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
417      } else if (compactionDesc != null) {
418        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
419
420        // assert that the compaction is applied
421        for (HStore store : secondaryRegion.getStores()) {
422          if (store.getColumnFamilyName().equals("cf1")) {
423            assertEquals(1, store.getStorefilesCount());
424          } else {
425            assertEquals(expectedStoreFileCount, store.getStorefilesCount());
426          }
427        }
428      } else {
429        lastReplayed = replayEdit(secondaryRegion, entry);;
430      }
431    }
432
433    assertEquals(400-1, lastReplayed);
434    LOG.info("-- Verifying edits from secondary");
435    verifyData(secondaryRegion, 0, 400, cq, families);
436
437    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
438    verifyData(primaryRegion, 0, lastReplayed, cq, families);
439    for (HStore store : primaryRegion.getStores()) {
440      if (store.getColumnFamilyName().equals("cf1")) {
441        assertEquals(1, store.getStorefilesCount());
442      } else {
443        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
444      }
445    }
446  }
447
448  /**
449   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
450   * equal to, greater or less than the previous flush start marker.
451   */
452  @Test
453  public void testReplayFlushStartMarkers() throws IOException {
454    // load some data to primary and flush. 1 flush and some more unflushed data
455    putDataWithFlushes(primaryRegion, 100, 100, 100);
456    int numRows = 200;
457
458    // now replay the edits and the flush marker
459    reader =  createWALReaderForPrimary();
460
461    LOG.info("-- Replaying edits and flush events in secondary");
462
463    FlushDescriptor startFlushDesc = null;
464
465    int lastReplayed = 0;
466    while (true) {
467      WAL.Entry entry = reader.next();
468      if (entry == null) {
469        break;
470      }
471      FlushDescriptor flushDesc
472      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
473      if (flushDesc != null) {
474        // first verify that everything is replayed and visible before flush event replay
475        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
476        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
477        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
478        MemStoreSize mss = store.getFlushableSize();
479
480        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
481          startFlushDesc = flushDesc;
482          LOG.info("-- Replaying flush start in secondary");
483          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
484          assertNull(result.result);
485          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
486          assertTrue(regionMemstoreSize > 0);
487          assertTrue(mss.getHeapSize() > 0);
488
489          // assert that the store memstore is smaller now
490          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
491          LOG.info("Memstore size reduced by:"
492              + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
493          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
494          verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
495
496        }
497        // after replay verify that everything is still visible
498        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
499      } else {
500        lastReplayed = replayEdit(secondaryRegion, entry);
501      }
502    }
503
504    // at this point, there should be some data (rows 0-100) in memstore snapshot
505    // and some more data in memstores (rows 100-200)
506
507    verifyData(secondaryRegion, 0, numRows, cq, families);
508
509    // Test case 1: replay the same flush start marker again
510    LOG.info("-- Replaying same flush start in secondary again");
511    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
512    assertNull(result); // this should return null. Ignoring the flush start marker
513    // assert that we still have prepared flush with the previous setup.
514    assertNotNull(secondaryRegion.getPrepareFlushResult());
515    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
516      startFlushDesc.getFlushSequenceNumber());
517    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
518    verifyData(secondaryRegion, 0, numRows, cq, families);
519
520    // Test case 2: replay a flush start marker with a smaller seqId
521    FlushDescriptor startFlushDescSmallerSeqId
522      = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
523    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
524    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
525    assertNull(result); // this should return null. Ignoring the flush start marker
526    // assert that we still have prepared flush with the previous setup.
527    assertNotNull(secondaryRegion.getPrepareFlushResult());
528    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
529      startFlushDesc.getFlushSequenceNumber());
530    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
531    verifyData(secondaryRegion, 0, numRows, cq, families);
532
533    // Test case 3: replay a flush start marker with a larger seqId
534    FlushDescriptor startFlushDescLargerSeqId
535      = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
536    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
537    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
538    assertNull(result); // this should return null. Ignoring the flush start marker
539    // assert that we still have prepared flush with the previous setup.
540    assertNotNull(secondaryRegion.getPrepareFlushResult());
541    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
542      startFlushDesc.getFlushSequenceNumber());
543    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
544    verifyData(secondaryRegion, 0, numRows, cq, families);
545
546    LOG.info("-- Verifying edits from secondary");
547    verifyData(secondaryRegion, 0, numRows, cq, families);
548
549    LOG.info("-- Verifying edits from primary.");
550    verifyData(primaryRegion, 0, numRows, cq, families);
551  }
552
553  /**
554   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
555   * less than the previous flush start marker.
556   */
557  @Test
558  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
559    // load some data to primary and flush. 2 flushes and some more unflushed data
560    putDataWithFlushes(primaryRegion, 100, 200, 100);
561    int numRows = 300;
562
563    // now replay the edits and the flush marker
564    reader =  createWALReaderForPrimary();
565
566    LOG.info("-- Replaying edits and flush events in secondary");
567    FlushDescriptor startFlushDesc = null;
568    FlushDescriptor commitFlushDesc = null;
569
570    int lastReplayed = 0;
571    while (true) {
572      System.out.println(lastReplayed);
573      WAL.Entry entry = reader.next();
574      if (entry == null) {
575        break;
576      }
577      FlushDescriptor flushDesc
578      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
579      if (flushDesc != null) {
580        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
581          // don't replay the first flush start marker, hold on to it, replay the second one
582          if (startFlushDesc == null) {
583            startFlushDesc = flushDesc;
584          } else {
585            LOG.info("-- Replaying flush start in secondary");
586            startFlushDesc = flushDesc;
587            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
588            assertNull(result.result);
589          }
590        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
591          // do not replay any flush commit yet
592          if (commitFlushDesc == null) {
593            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
594          }
595        }
596        // after replay verify that everything is still visible
597        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
598      } else {
599        lastReplayed = replayEdit(secondaryRegion, entry);
600      }
601    }
602
603    // at this point, there should be some data (rows 0-200) in memstore snapshot
604    // and some more data in memstores (rows 200-300)
605    verifyData(secondaryRegion, 0, numRows, cq, families);
606
607    // no store files in the region
608    int expectedStoreFileCount = 0;
609    for (HStore s : secondaryRegion.getStores()) {
610      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
611    }
612    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
613
614    // Test case 1: replay the a flush commit marker smaller than what we have prepared
615    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
616        + startFlushDesc);
617    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
618
619    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
620    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
621
622    // assert that the flush files are picked
623    expectedStoreFileCount++;
624    for (HStore s : secondaryRegion.getStores()) {
625      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
626    }
627    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
628    MemStoreSize mss = store.getFlushableSize();
629    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
630
631    // assert that the region memstore is same as before
632    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
633    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
634
635    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
636
637    LOG.info("-- Verifying edits from secondary");
638    verifyData(secondaryRegion, 0, numRows, cq, families);
639
640    LOG.info("-- Verifying edits from primary.");
641    verifyData(primaryRegion, 0, numRows, cq, families);
642  }
643
644  /**
645   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
646   * larger than the previous flush start marker.
647   */
648  @Test
649  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
650    // load some data to primary and flush. 1 flush and some more unflushed data
651    putDataWithFlushes(primaryRegion, 100, 100, 100);
652    int numRows = 200;
653
654    // now replay the edits and the flush marker
655    reader =  createWALReaderForPrimary();
656
657    LOG.info("-- Replaying edits and flush events in secondary");
658    FlushDescriptor startFlushDesc = null;
659    FlushDescriptor commitFlushDesc = null;
660
661    int lastReplayed = 0;
662    while (true) {
663      WAL.Entry entry = reader.next();
664      if (entry == null) {
665        break;
666      }
667      FlushDescriptor flushDesc
668      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
669      if (flushDesc != null) {
670        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
671          if (startFlushDesc == null) {
672            LOG.info("-- Replaying flush start in secondary");
673            startFlushDesc = flushDesc;
674            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
675            assertNull(result.result);
676          }
677        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
678          // do not replay any flush commit yet
679          // hold on to the flush commit marker but simulate a larger
680          // flush commit seqId
681          commitFlushDesc =
682              FlushDescriptor.newBuilder(flushDesc)
683              .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
684              .build();
685        }
686        // after replay verify that everything is still visible
687        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
688      } else {
689        lastReplayed = replayEdit(secondaryRegion, entry);
690      }
691    }
692
693    // at this point, there should be some data (rows 0-100) in memstore snapshot
694    // and some more data in memstores (rows 100-200)
695    verifyData(secondaryRegion, 0, numRows, cq, families);
696
697    // no store files in the region
698    int expectedStoreFileCount = 0;
699    for (HStore s : secondaryRegion.getStores()) {
700      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
701    }
702    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
703
704    // Test case 1: replay the a flush commit marker larger than what we have prepared
705    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
706        + startFlushDesc);
707    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
708
709    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
710    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
711
712    // assert that the flush files are picked
713    expectedStoreFileCount++;
714    for (HStore s : secondaryRegion.getStores()) {
715      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
716    }
717    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
718    MemStoreSize mss = store.getFlushableSize();
719    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
720
721    // assert that the region memstore is smaller than before, but not empty
722    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
723    assertTrue(newRegionMemstoreSize > 0);
724    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
725
726    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
727
728    LOG.info("-- Verifying edits from secondary");
729    verifyData(secondaryRegion, 0, numRows, cq, families);
730
731    LOG.info("-- Verifying edits from primary.");
732    verifyData(primaryRegion, 0, numRows, cq, families);
733  }
734
735  /**
736   * Tests the case where we receive a flush commit before receiving any flush prepare markers.
737   * The memstore edits should be dropped after the flush commit replay since they should be in
738   * flushed files
739   */
740  @Test
741  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
742      throws IOException {
743    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
744  }
745
746  /**
747   * Tests the case where we receive a flush commit before receiving any flush prepare markers.
748   * The memstore edits should be not dropped after the flush commit replay since not every edit
749   * will be in flushed files (based on seqId)
750   */
751  @Test
752  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
753      throws IOException {
754    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
755  }
756
757  /**
758   * Tests the case where we receive a flush commit before receiving any flush prepare markers
759   */
760  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
761      throws IOException {
762    // load some data to primary and flush. 1 flushes and some more unflushed data.
763    // write more data after flush depending on whether droppableSnapshot
764    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
765    int numRows = droppableMemstore ? 100 : 200;
766
767    // now replay the edits and the flush marker
768    reader =  createWALReaderForPrimary();
769
770    LOG.info("-- Replaying edits and flush events in secondary");
771    FlushDescriptor commitFlushDesc = null;
772
773    int lastReplayed = 0;
774    while (true) {
775      WAL.Entry entry = reader.next();
776      if (entry == null) {
777        break;
778      }
779      FlushDescriptor flushDesc
780      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
781      if (flushDesc != null) {
782        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
783          // do not replay flush start marker
784        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
785          commitFlushDesc = flushDesc; // hold on to the flush commit marker
786        }
787        // after replay verify that everything is still visible
788        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
789      } else {
790        lastReplayed = replayEdit(secondaryRegion, entry);
791      }
792    }
793
794    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
795    // and some more data in memstores (rows 100-300)
796    verifyData(secondaryRegion, 0, numRows, cq, families);
797
798    // no store files in the region
799    int expectedStoreFileCount = 0;
800    for (HStore s : secondaryRegion.getStores()) {
801      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
802    }
803    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
804
805    // Test case 1: replay a flush commit marker without start flush marker
806    assertNull(secondaryRegion.getPrepareFlushResult());
807    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
808
809    // ensure all files are visible in secondary
810    for (HStore store : secondaryRegion.getStores()) {
811      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
812    }
813
814    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
815    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
816
817    // assert that the flush files are picked
818    expectedStoreFileCount++;
819    for (HStore s : secondaryRegion.getStores()) {
820      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
821    }
822    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
823    MemStoreSize mss = store.getFlushableSize();
824    if (droppableMemstore) {
825      // assert that the memstore is dropped
826      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
827    } else {
828      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
829    }
830
831    // assert that the region memstore is same as before (we could not drop)
832    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
833    if (droppableMemstore) {
834      assertTrue(0 == newRegionMemstoreSize);
835    } else {
836      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
837    }
838
839    LOG.info("-- Verifying edits from secondary");
840    verifyData(secondaryRegion, 0, numRows, cq, families);
841
842    LOG.info("-- Verifying edits from primary.");
843    verifyData(primaryRegion, 0, numRows, cq, families);
844  }
845
846  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
847    return FlushDescriptor.newBuilder(flush)
848        .setFlushSequenceNumber(flushSeqId)
849        .build();
850  }
851
852  /**
853   * Tests replaying region open markers from primary region. Checks whether the files are picked up
854   */
855  @Test
856  public void testReplayRegionOpenEvent() throws IOException {
857    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
858    int numRows = 100;
859
860    // close the region and open again.
861    primaryRegion.close();
862    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
863
864    // now replay the edits and the flush marker
865    reader =  createWALReaderForPrimary();
866    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
867
868    LOG.info("-- Replaying edits and region events in secondary");
869    while (true) {
870      WAL.Entry entry = reader.next();
871      if (entry == null) {
872        break;
873      }
874      FlushDescriptor flushDesc
875        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
876      RegionEventDescriptor regionEventDesc
877        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
878
879      if (flushDesc != null) {
880        // don't replay flush events
881      } else if (regionEventDesc != null) {
882        regionEvents.add(regionEventDesc);
883      } else {
884        // don't replay edits
885      }
886    }
887
888    // we should have 1 open, 1 close and 1 open event
889    assertEquals(3, regionEvents.size());
890
891    // replay the first region open event.
892    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
893
894    // replay the close event as well
895    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
896
897    // no store files in the region
898    int expectedStoreFileCount = 0;
899    for (HStore s : secondaryRegion.getStores()) {
900      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
901    }
902    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
903    assertTrue(regionMemstoreSize == 0);
904
905    // now replay the region open event that should contain new file locations
906    LOG.info("Testing replaying region open event " + regionEvents.get(2));
907    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
908
909    // assert that the flush files are picked
910    expectedStoreFileCount++;
911    for (HStore s : secondaryRegion.getStores()) {
912      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
913    }
914    Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
915    MemStoreSize mss = store.getFlushableSize();
916    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
917
918    // assert that the region memstore is empty
919    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
920    assertTrue(newRegionMemstoreSize == 0);
921
922    assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
923
924    LOG.info("-- Verifying edits from secondary");
925    verifyData(secondaryRegion, 0, numRows, cq, families);
926
927    LOG.info("-- Verifying edits from primary.");
928    verifyData(primaryRegion, 0, numRows, cq, families);
929  }
930
931  /**
932   * Tests the case where we replay a region open event after a flush start but before receiving
933   * flush commit
934   */
935  @Test
936  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
937    putDataWithFlushes(primaryRegion, 100, 100, 100);
938    int numRows = 200;
939
940    // close the region and open again.
941    primaryRegion.close();
942    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
943
944    // now replay the edits and the flush marker
945    reader =  createWALReaderForPrimary();
946    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
947
948    LOG.info("-- Replaying edits and region events in secondary");
949    while (true) {
950      WAL.Entry entry = reader.next();
951      if (entry == null) {
952        break;
953      }
954      FlushDescriptor flushDesc
955        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
956      RegionEventDescriptor regionEventDesc
957        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
958
959      if (flushDesc != null) {
960        // only replay flush start
961        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
962          secondaryRegion.replayWALFlushStartMarker(flushDesc);
963        }
964      } else if (regionEventDesc != null) {
965        regionEvents.add(regionEventDesc);
966      } else {
967        replayEdit(secondaryRegion, entry);
968      }
969    }
970
971    // at this point, there should be some data (rows 0-100) in the memstore snapshot
972    // and some more data in memstores (rows 100-200)
973    verifyData(secondaryRegion, 0, numRows, cq, families);
974
975    // we should have 1 open, 1 close and 1 open event
976    assertEquals(3, regionEvents.size());
977
978    // no store files in the region
979    int expectedStoreFileCount = 0;
980    for (HStore s : secondaryRegion.getStores()) {
981      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
982    }
983
984    // now replay the region open event that should contain new file locations
985    LOG.info("Testing replaying region open event " + regionEvents.get(2));
986    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
987
988    // assert that the flush files are picked
989    expectedStoreFileCount = 2; // two flushes happened
990    for (HStore s : secondaryRegion.getStores()) {
991      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
992    }
993    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
994    MemStoreSize newSnapshotSize = store.getSnapshotSize();
995    assertTrue(newSnapshotSize.getDataSize() == 0);
996
997    // assert that the region memstore is empty
998    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
999    assertTrue(newRegionMemstoreSize == 0);
1000
1001    assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
1002
1003    LOG.info("-- Verifying edits from secondary");
1004    verifyData(secondaryRegion, 0, numRows, cq, families);
1005
1006    LOG.info("-- Verifying edits from primary.");
1007    verifyData(primaryRegion, 0, numRows, cq, families);
1008  }
1009
1010  /**
1011   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1012   * of the last replayed region open event.
1013   */
1014  @Test
1015  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1016    putDataWithFlushes(primaryRegion, 100, 100, 0);
1017    int numRows = 100;
1018
1019    // close the region and open again.
1020    primaryRegion.close();
1021    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1022
1023    // now replay the edits and the flush marker
1024    reader =  createWALReaderForPrimary();
1025    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1026    List<WAL.Entry> edits = Lists.newArrayList();
1027
1028    LOG.info("-- Replaying edits and region events in secondary");
1029    while (true) {
1030      WAL.Entry entry = reader.next();
1031      if (entry == null) {
1032        break;
1033      }
1034      FlushDescriptor flushDesc
1035        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1036      RegionEventDescriptor regionEventDesc
1037        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1038
1039      if (flushDesc != null) {
1040        // don't replay flushes
1041      } else if (regionEventDesc != null) {
1042        regionEvents.add(regionEventDesc);
1043      } else {
1044        edits.add(entry);
1045      }
1046    }
1047
1048    // replay the region open of first open, but with the seqid of the second open
1049    // this way non of the flush files will be picked up.
1050    secondaryRegion.replayWALRegionEventMarker(
1051      RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1052        regionEvents.get(2).getLogSequenceNumber()).build());
1053
1054
1055    // replay edits from the before region close. If replay does not
1056    // skip these the following verification will NOT fail.
1057    for (WAL.Entry entry: edits) {
1058      replayEdit(secondaryRegion, entry);
1059    }
1060
1061    boolean expectedFail = false;
1062    try {
1063      verifyData(secondaryRegion, 0, numRows, cq, families);
1064    } catch (AssertionError e) {
1065      expectedFail = true; // expected
1066    }
1067    if (!expectedFail) {
1068      fail("Should have failed this verification");
1069    }
1070  }
1071
1072  @Test
1073  public void testReplayFlushSeqIds() throws IOException {
1074    // load some data to primary and flush
1075    int start = 0;
1076    LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
1077    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1078    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1079    primaryRegion.flush(true);
1080
1081    // now replay the flush marker
1082    reader =  createWALReaderForPrimary();
1083
1084    long flushSeqId = -1;
1085    LOG.info("-- Replaying flush events in secondary");
1086    while (true) {
1087      WAL.Entry entry = reader.next();
1088      if (entry == null) {
1089        break;
1090      }
1091      FlushDescriptor flushDesc
1092        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1093      if (flushDesc != null) {
1094        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1095          LOG.info("-- Replaying flush start in secondary");
1096          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1097          flushSeqId = flushDesc.getFlushSequenceNumber();
1098        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1099          LOG.info("-- Replaying flush commit in secondary");
1100          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1101          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1102        }
1103      }
1104      // else do not replay
1105    }
1106
1107    // TODO: what to do with this?
1108    // assert that the newly picked up flush file is visible
1109    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1110    assertEquals(flushSeqId, readPoint);
1111
1112    // after replay verify that everything is still visible
1113    verifyData(secondaryRegion, 0, 100, cq, families);
1114  }
1115
1116  @Test
1117  public void testSeqIdsFromReplay() throws IOException {
1118    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1119    // original seqIds and they are made visible through mvcc read point upon replay
1120    String method = name.getMethodName();
1121    byte[] tableName = Bytes.toBytes(method);
1122    byte[] family = Bytes.toBytes("family");
1123
1124    HRegion region = initHRegion(tableName, method, family);
1125    try {
1126      // replay an entry that is bigger than current read point
1127      long readPoint = region.getMVCC().getReadPoint();
1128      long origSeqId = readPoint + 100;
1129
1130      Put put = new Put(row).addColumn(family, row, row);
1131      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1132      replay(region, put, origSeqId);
1133
1134      // read point should have advanced to this seqId
1135      assertGet(region, family, row);
1136
1137      // region seqId should have advanced at least to this seqId
1138      assertEquals(origSeqId, region.getReadPoint(null));
1139
1140      // replay an entry that is smaller than current read point
1141      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1142      // replay does not allow reads while replay is going on.
1143      put = new Put(row2).addColumn(family, row2, row2);
1144      put.setDurability(Durability.SKIP_WAL);
1145      replay(region, put, origSeqId - 50);
1146
1147      assertGet(region, family, row2);
1148    } finally {
1149      region.close();
1150    }
1151  }
1152
1153  /**
1154   * Tests that a region opened in secondary mode would not write region open / close
1155   * events to its WAL.
1156   * @throws IOException
1157   */
1158  @Test
1159  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1160    secondaryRegion.close();
1161    walSecondary = spy(walSecondary);
1162
1163    // test for region open and close
1164    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1165    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1166      any(WALEdit.class));
1167
1168    // test for replay prepare flush
1169    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1170    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1171      setFlushSequenceNumber(10)
1172      .setTableName(UnsafeByteOperations.unsafeWrap(
1173          primaryRegion.getTableDescriptor().getTableName().getName()))
1174      .setAction(FlushAction.START_FLUSH)
1175      .setEncodedRegionName(
1176          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1177      .setRegionName(UnsafeByteOperations.unsafeWrap(
1178          primaryRegion.getRegionInfo().getRegionName()))
1179      .build());
1180
1181    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1182      any(WALEdit.class));
1183
1184    secondaryRegion.close();
1185    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1186      any(WALEdit.class));
1187  }
1188
1189  /**
1190   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1191   */
1192  @Test
1193  public void testRegionReadsEnabledFlag() throws IOException {
1194
1195    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1196
1197    verifyData(secondaryRegion, 0, 100, cq, families);
1198
1199    // now disable reads
1200    secondaryRegion.setReadsEnabled(false);
1201    try {
1202      verifyData(secondaryRegion, 0, 100, cq, families);
1203      fail("Should have failed with IOException");
1204    } catch(IOException ex) {
1205      // expected
1206    }
1207
1208    // verify that we can still replay data
1209    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1210
1211    // now enable reads again
1212    secondaryRegion.setReadsEnabled(true);
1213    verifyData(secondaryRegion, 0, 200, cq, families);
1214  }
1215
1216  /**
1217   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1218   * It should write the flush request marker instead.
1219   */
1220  @Test
1221  public void testWriteFlushRequestMarker() throws IOException {
1222    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1223    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1224    assertNotNull(result);
1225    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1226    assertFalse(result.wroteFlushWalMarker);
1227
1228    // request flush again, but this time with writeFlushRequestWalMarker = true
1229    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1230    assertNotNull(result);
1231    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1232    assertTrue(result.wroteFlushWalMarker);
1233
1234    List<FlushDescriptor> flushes = Lists.newArrayList();
1235    reader = createWALReaderForPrimary();
1236    while (true) {
1237      WAL.Entry entry = reader.next();
1238      if (entry == null) {
1239        break;
1240      }
1241      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1242      if (flush != null) {
1243        flushes.add(flush);
1244      }
1245    }
1246
1247    assertEquals(1, flushes.size());
1248    assertNotNull(flushes.get(0));
1249    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1250  }
1251
1252  /**
1253   * Test the case where the secondary region replica is not in reads enabled state because it is
1254   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1255   * flush marker entry should restore the reads enabled status in the region and allow the reads
1256   * to continue.
1257   */
1258  @Test
1259  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1260    disableReads(secondaryRegion);
1261
1262    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1263    // triggered flush restores readsEnabled
1264    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1265    reader = createWALReaderForPrimary();
1266    while (true) {
1267      WAL.Entry entry = reader.next();
1268      if (entry == null) {
1269        break;
1270      }
1271      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1272      if (flush != null) {
1273        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1274      }
1275    }
1276
1277    // now reads should be enabled
1278    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1279  }
1280
1281  /**
1282   * Test the case where the secondary region replica is not in reads enabled state because it is
1283   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1284   * entries should restore the reads enabled status in the region and allow the reads
1285   * to continue.
1286   */
1287  @Test
1288  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1289    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1290    // from triggered flush restores readsEnabled
1291    disableReads(secondaryRegion);
1292
1293    // put some data in primary
1294    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1295    primaryRegion.flush(true);
1296    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1297    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1298    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1299    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1300    // but can't figure it... and this is only test that seems to suffer this flush issue.
1301    // St.Ack 20160201
1302    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1303
1304    reader = createWALReaderForPrimary();
1305    while (true) {
1306      WAL.Entry entry = reader.next();
1307      LOG.info(Objects.toString(entry));
1308      if (entry == null) {
1309        break;
1310      }
1311      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1312      if (flush != null) {
1313        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1314      } else {
1315        replayEdit(secondaryRegion, entry);
1316      }
1317    }
1318
1319    // now reads should be enabled
1320    verifyData(secondaryRegion, 0, 100, cq, families);
1321  }
1322
1323  /**
1324   * Test the case where the secondary region replica is not in reads enabled state because it is
1325   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1326   * entries should restore the reads enabled status in the region and allow the reads
1327   * to continue.
1328   */
1329  @Test
1330  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1331    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1332    // from triggered flush restores readsEnabled
1333    disableReads(secondaryRegion);
1334
1335    // put some data in primary
1336    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1337    primaryRegion.flush(true);
1338
1339    reader = createWALReaderForPrimary();
1340    while (true) {
1341      WAL.Entry entry = reader.next();
1342      if (entry == null) {
1343        break;
1344      }
1345      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1346      if (flush != null) {
1347        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1348      }
1349    }
1350
1351    // now reads should be enabled
1352    verifyData(secondaryRegion, 0, 100, cq, families);
1353  }
1354
1355  /**
1356   * Test the case where the secondary region replica is not in reads enabled state because it is
1357   * waiting for a flush or region open marker from primary region. Replaying region open event
1358   * entry from primary should restore the reads enabled status in the region and allow the reads
1359   * to continue.
1360   */
1361  @Test
1362  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1363    // Test case 3: Test that replaying region open event markers restores readsEnabled
1364    disableReads(secondaryRegion);
1365
1366    primaryRegion.close();
1367    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1368
1369    reader = createWALReaderForPrimary();
1370    while (true) {
1371      WAL.Entry entry = reader.next();
1372      if (entry == null) {
1373        break;
1374      }
1375
1376      RegionEventDescriptor regionEventDesc
1377        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1378
1379      if (regionEventDesc != null) {
1380        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1381      }
1382    }
1383
1384    // now reads should be enabled
1385    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1386  }
1387
1388  @Test
1389  public void testRefresStoreFiles() throws IOException {
1390    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1391    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1392
1393    // Test case 1: refresh with an empty region
1394    secondaryRegion.refreshStoreFiles();
1395    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1396
1397    // do one flush
1398    putDataWithFlushes(primaryRegion, 100, 100, 0);
1399    int numRows = 100;
1400
1401    // refresh the store file list, and ensure that the files are picked up.
1402    secondaryRegion.refreshStoreFiles();
1403    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1404      secondaryRegion.getStoreFileList(families));
1405    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1406
1407    LOG.info("-- Verifying edits from secondary");
1408    verifyData(secondaryRegion, 0, numRows, cq, families);
1409
1410    // Test case 2: 3 some more flushes
1411    putDataWithFlushes(primaryRegion, 100, 300, 0);
1412    numRows = 300;
1413
1414    // refresh the store file list, and ensure that the files are picked up.
1415    secondaryRegion.refreshStoreFiles();
1416    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1417      secondaryRegion.getStoreFileList(families));
1418    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1419
1420    LOG.info("-- Verifying edits from secondary");
1421    verifyData(secondaryRegion, 0, numRows, cq, families);
1422
1423    if (FSUtils.WINDOWS) {
1424      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1425      return;
1426    }
1427
1428    // Test case 3: compact primary files
1429    primaryRegion.compactStores();
1430    List<HRegion> regions = new ArrayList<>();
1431    regions.add(primaryRegion);
1432    Mockito.doReturn(regions).when(rss).getRegions();
1433    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1434    cleaner.chore();
1435    secondaryRegion.refreshStoreFiles();
1436    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1437      secondaryRegion.getStoreFileList(families));
1438    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1439
1440    LOG.info("-- Verifying edits from secondary");
1441    verifyData(secondaryRegion, 0, numRows, cq, families);
1442
1443    LOG.info("-- Replaying edits in secondary");
1444
1445    // Test case 4: replay some edits, ensure that memstore is dropped.
1446    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1447    putDataWithFlushes(primaryRegion, 400, 400, 0);
1448    numRows = 400;
1449
1450    reader =  createWALReaderForPrimary();
1451    while (true) {
1452      WAL.Entry entry = reader.next();
1453      if (entry == null) {
1454        break;
1455      }
1456      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1457      if (flush != null) {
1458        // do not replay flush
1459      } else {
1460        replayEdit(secondaryRegion, entry);
1461      }
1462    }
1463
1464    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1465
1466    secondaryRegion.refreshStoreFiles();
1467
1468    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1469
1470    LOG.info("-- Verifying edits from primary");
1471    verifyData(primaryRegion, 0, numRows, cq, families);
1472    LOG.info("-- Verifying edits from secondary");
1473    verifyData(secondaryRegion, 0, numRows, cq, families);
1474  }
1475
1476  /**
1477   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1478   */
1479  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1480    List<Path> l1 = new ArrayList<>(list1.size());
1481    for (String path : list1) {
1482      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1483    }
1484    List<Path> l2 = new ArrayList<>(list2.size());
1485    for (String path : list2) {
1486      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1487    }
1488    assertEquals(l1, l2);
1489  }
1490
1491  private void disableReads(HRegion region) {
1492    region.setReadsEnabled(false);
1493    try {
1494      verifyData(region, 0, 1, cq, families);
1495      fail("Should have failed with IOException");
1496    } catch(IOException ex) {
1497      // expected
1498    }
1499  }
1500
1501  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1502    put.setDurability(Durability.SKIP_WAL);
1503    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1504    region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1505  }
1506
1507  /**
1508   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1509   */
1510  @Test
1511  public void testReplayBulkLoadEvent() throws IOException {
1512    LOG.info("testReplayBulkLoadEvent starts");
1513    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1514
1515    // close the region and open again.
1516    primaryRegion.close();
1517    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1518
1519    // bulk load a file into primary region
1520    Random random = new Random();
1521    byte[] randomValues = new byte[20];
1522    random.nextBytes(randomValues);
1523    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1524
1525    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1526    int expectedLoadFileCount = 0;
1527    for (byte[] family : families) {
1528      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1529      expectedLoadFileCount++;
1530    }
1531    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1532
1533    // now replay the edits and the bulk load marker
1534    reader = createWALReaderForPrimary();
1535
1536    LOG.info("-- Replaying edits and region events in secondary");
1537    BulkLoadDescriptor bulkloadEvent = null;
1538    while (true) {
1539      WAL.Entry entry = reader.next();
1540      if (entry == null) {
1541        break;
1542      }
1543      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1544      if (bulkloadEvent != null) {
1545        break;
1546      }
1547    }
1548
1549    // we should have 1 bulk load event
1550    assertTrue(bulkloadEvent != null);
1551    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1552
1553    // replay the bulk load event
1554    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1555
1556
1557    List<String> storeFileName = new ArrayList<>();
1558    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1559      storeFileName.addAll(storeDesc.getStoreFileList());
1560    }
1561    // assert that the bulk loaded files are picked
1562    for (HStore s : secondaryRegion.getStores()) {
1563      for (HStoreFile sf : s.getStorefiles()) {
1564        storeFileName.remove(sf.getPath().getName());
1565      }
1566    }
1567    assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1568
1569    LOG.info("-- Verifying edits from secondary");
1570    for (byte[] family : families) {
1571      assertGet(secondaryRegion, family, randomValues);
1572    }
1573  }
1574
1575  @Test
1576  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1577    // tests replaying flush commit marker, but the flush file has already been compacted
1578    // from primary and also deleted from the archive directory
1579    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1580      setFlushSequenceNumber(Long.MAX_VALUE)
1581      .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1582      .setAction(FlushAction.COMMIT_FLUSH)
1583      .setEncodedRegionName(
1584          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1585      .setRegionName(UnsafeByteOperations.unsafeWrap(
1586          primaryRegion.getRegionInfo().getRegionName()))
1587      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1588        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1589        .setStoreHomeDir("/store_home_dir")
1590        .addFlushOutput("/foo/baz/123")
1591        .build())
1592      .build());
1593  }
1594
1595  @Test
1596  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1597    // tests replaying compaction marker, but the compaction output file has already been compacted
1598    // from primary and also deleted from the archive directory
1599    secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1600      .setTableName(UnsafeByteOperations.unsafeWrap(
1601          primaryRegion.getTableDescriptor().getTableName().getName()))
1602      .setEncodedRegionName(
1603          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1604      .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1605      .addCompactionInput("/123")
1606      .addCompactionOutput("/456")
1607      .setStoreHomeDir("/store_home_dir")
1608      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1609      .build()
1610      , true, true, Long.MAX_VALUE);
1611  }
1612
1613  @Test
1614  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1615    // tests replaying region open event marker, but the region files have already been compacted
1616    // from primary and also deleted from the archive directory
1617    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1618      .setTableName(UnsafeByteOperations.unsafeWrap(
1619          primaryRegion.getTableDescriptor().getTableName().getName()))
1620      .setEncodedRegionName(
1621          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1622      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1623      .setEventType(EventType.REGION_OPEN)
1624      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1625      .setLogSequenceNumber(Long.MAX_VALUE)
1626      .addStores(StoreDescriptor.newBuilder()
1627        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1628        .setStoreHomeDir("/store_home_dir")
1629        .addStoreFile("/123")
1630        .build())
1631      .build());
1632  }
1633
1634  @Test
1635  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1636    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1637    // from primary and also deleted from the archive directory
1638    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1639      .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1640      .setEncodedRegionName(
1641          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1642      .setBulkloadSeqNum(Long.MAX_VALUE)
1643      .addStores(StoreDescriptor.newBuilder()
1644        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1645        .setStoreHomeDir("/store_home_dir")
1646        .addStoreFile("/123")
1647        .build())
1648      .build());
1649  }
1650
1651  private String createHFileForFamilies(Path testPath, byte[] family,
1652      byte[] valueBytes) throws IOException {
1653    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1654    // TODO We need a way to do this without creating files
1655    Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1656    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1657    try {
1658      hFileFactory.withOutputStream(out);
1659      hFileFactory.withFileContext(new HFileContextBuilder().build());
1660      HFile.Writer writer = hFileFactory.create();
1661      try {
1662        writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L,
1663          KeyValue.Type.Put.getCode(), valueBytes)));
1664      } finally {
1665        writer.close();
1666      }
1667    } finally {
1668      out.close();
1669    }
1670    return testFile.toString();
1671  }
1672
1673  /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1674   * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1675   * more rows but does not execute flush after
1676   * @throws IOException */
1677  private void putDataWithFlushes(HRegion region, int flushInterval,
1678      int numRows, int numRowsAfterFlush) throws IOException {
1679    int start = 0;
1680    for (; start < numRows; start += flushInterval) {
1681      LOG.info("-- Writing some data to primary from " +  start + " to " + (start+flushInterval));
1682      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1683      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1684      region.flush(true);
1685    }
1686    LOG.info("-- Writing some more data to primary, not flushing");
1687    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1688  }
1689
1690  private void putDataByReplay(HRegion region,
1691      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1692    for (int i = startRow; i < startRow + numRows; i++) {
1693      Put put = new Put(Bytes.toBytes("" + i));
1694      put.setDurability(Durability.SKIP_WAL);
1695      for (byte[] family : families) {
1696        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1697      }
1698      replay(region, put, i+1);
1699    }
1700  }
1701
1702  private static HRegion initHRegion(byte[] tableName,
1703      String callingMethod, byte[]... families) throws IOException {
1704    return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1705      callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1706  }
1707
1708  private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1709      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1710      WAL wal, byte[]... families) throws IOException {
1711    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1712      isReadOnly, durability, wal, families);
1713  }
1714}