001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.FileNotFoundException;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.Map;
041import java.util.Objects;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
054import org.apache.hadoop.hbase.client.Durability;
055import org.apache.hadoop.hbase.client.Get;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionInfo;
058import org.apache.hadoop.hbase.client.RegionInfoBuilder;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
061import org.apache.hadoop.hbase.executor.ExecutorService;
062import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
063import org.apache.hadoop.hbase.executor.ExecutorType;
064import org.apache.hadoop.hbase.io.hfile.HFile;
065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
066import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
067import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
068import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
069import org.apache.hadoop.hbase.testclassification.LargeTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
073import org.apache.hadoop.hbase.util.FSUtils;
074import org.apache.hadoop.hbase.util.Pair;
075import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
076import org.apache.hadoop.hbase.wal.WAL;
077import org.apache.hadoop.hbase.wal.WALEdit;
078import org.apache.hadoop.hbase.wal.WALFactory;
079import org.apache.hadoop.hbase.wal.WALKeyImpl;
080import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
081import org.apache.hadoop.util.StringUtils;
082import org.junit.After;
083import org.junit.AfterClass;
084import org.junit.Before;
085import org.junit.BeforeClass;
086import org.junit.ClassRule;
087import org.junit.Rule;
088import org.junit.Test;
089import org.junit.experimental.categories.Category;
090import org.junit.rules.TestName;
091import org.mockito.Mockito;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
096import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
097
098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
108
109/**
110 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
111 * region replicas
112 */
113@Category(LargeTests.class)
114public class TestHRegionReplayEvents {
115
116  @ClassRule
117  public static final HBaseClassTestRule CLASS_RULE =
118    HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
119
120  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
121  @Rule
122  public TestName name = new TestName();
123
124  private static HBaseTestingUtility TEST_UTIL;
125
126  public static Configuration CONF;
127  private String dir;
128
129  private byte[][] families =
130    new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") };
131
132  // Test names
133  protected byte[] tableName;
134  protected String method;
135  protected final byte[] row = Bytes.toBytes("rowA");
136  protected final byte[] row2 = Bytes.toBytes("rowB");
137  protected byte[] cq = Bytes.toBytes("cq");
138
139  // per test fields
140  private Path rootDir;
141  private TableDescriptor htd;
142  private RegionServerServices rss;
143  private RegionInfo primaryHri, secondaryHri;
144  private HRegion primaryRegion, secondaryRegion;
145  private WAL walPrimary, walSecondary;
146  private WAL.Reader reader;
147
148  @BeforeClass
149  public static void setUpBeforeClass() throws Exception {
150    TEST_UTIL = new HBaseTestingUtility();
151    TEST_UTIL.startMiniDFSCluster(1);
152  }
153
154  @AfterClass
155  public static void tearDownAfterClass() throws Exception {
156    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
157    TEST_UTIL.cleanupTestDir();
158    TEST_UTIL.shutdownMiniDFSCluster();
159  }
160
161  @Before
162  public void setUp() throws Exception {
163    CONF = TEST_UTIL.getConfiguration();
164    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
165    method = name.getMethodName();
166    tableName = Bytes.toBytes(name.getMethodName());
167    rootDir = new Path(dir + method);
168    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
169    method = name.getMethodName();
170    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
171    for (byte[] family : families) {
172      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
173    }
174    htd = builder.build();
175
176    long time = EnvironmentEdgeManager.currentTime();
177    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
178      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
179    primaryHri =
180      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
181    secondaryHri =
182      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
183
184    WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
185    walPrimary = wals.getWAL(primaryHri);
186    walSecondary = wals.getWAL(secondaryHri);
187
188    rss = mock(RegionServerServices.class);
189    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
190    when(rss.getConfiguration()).thenReturn(CONF);
191    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
192    String string =
193      org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString();
194    ExecutorService es = new ExecutorService(string);
195    es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1)
196      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
197    when(rss.getExecutorService()).thenReturn(es);
198    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
199    primaryRegion.close();
200    List<HRegion> regions = new ArrayList<>();
201    regions.add(primaryRegion);
202    Mockito.doReturn(regions).when(rss).getRegions();
203
204    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
205    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
206
207    reader = null;
208  }
209
210  @After
211  public void tearDown() throws Exception {
212    if (reader != null) {
213      reader.close();
214    }
215
216    if (primaryRegion != null) {
217      HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
218    }
219    if (secondaryRegion != null) {
220      HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
221    }
222
223    EnvironmentEdgeManagerTestHelper.reset();
224  }
225
226  String getName() {
227    return name.getMethodName();
228  }
229
230  // Some of the test cases are as follows:
231  // 1. replay flush start marker again
232  // 2. replay flush with smaller seqId than what is there in memstore snapshot
233  // 3. replay flush with larger seqId than what is there in memstore snapshot
234  // 4. replay flush commit without flush prepare. non droppable memstore
235  // 5. replay flush commit without flush prepare. droppable memstore
236  // 6. replay open region event
237  // 7. replay open region event after flush start
238  // 8. replay flush form an earlier seqId (test ignoring seqIds)
239  // 9. start flush does not prevent region from closing.
240
241  @Test
242  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
243    // load some data and flush ensure that the secondary replica will not execute the flush
244
245    // load some data to secondary by replaying
246    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
247
248    verifyData(secondaryRegion, 0, 1000, cq, families);
249
250    // flush region
251    FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true);
252    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
253
254    verifyData(secondaryRegion, 0, 1000, cq, families);
255
256    // close the region, and inspect that it has not flushed
257    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
258    // assert that there are no files (due to flush)
259    for (List<HStoreFile> f : files.values()) {
260      assertTrue(f.isEmpty());
261    }
262  }
263
264  /**
265   * Tests a case where we replay only a flush start marker, then the region is closed. This region
266   * should not block indefinitely
267   */
268  @Test
269  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
270    // load some data to primary and flush
271    int start = 0;
272    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
273    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
274    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
275    primaryRegion.flush(true);
276
277    // now replay the edits and the flush marker
278    reader = createWALReaderForPrimary();
279
280    LOG.info("-- Replaying edits and flush events in secondary");
281    while (true) {
282      WAL.Entry entry = reader.next();
283      if (entry == null) {
284        break;
285      }
286      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
287      if (flushDesc != null) {
288        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
289          LOG.info("-- Replaying flush start in secondary");
290          secondaryRegion.replayWALFlushStartMarker(flushDesc);
291        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
292          LOG.info("-- NOT Replaying flush commit in secondary");
293        }
294      } else {
295        replayEdit(secondaryRegion, entry);
296      }
297    }
298
299    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
300    // now close the region which should not cause hold because of un-committed flush
301    secondaryRegion.close();
302
303    // verify that the memstore size is back to what it was
304    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
305  }
306
307  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
308    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
309      return 0; // handled elsewhere
310    }
311    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
312    for (Cell cell : entry.getEdit().getCells())
313      put.add(cell);
314    put.setDurability(Durability.SKIP_WAL);
315    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
316    region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId());
317    return Integer.parseInt(Bytes.toString(put.getRow()));
318  }
319
320  WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
321    return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
322      AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration());
323  }
324
325  @Test
326  public void testBatchReplayWithMultipleNonces() throws IOException {
327    try {
328      MutationReplay[] mutations = new MutationReplay[100];
329      for (int i = 0; i < 100; i++) {
330        Put put = new Put(Bytes.toBytes(i));
331        put.setDurability(Durability.SYNC_WAL);
332        for (byte[] familly : this.families) {
333          put.addColumn(familly, this.cq, null);
334          long nonceNum = i / 10;
335          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
336        }
337      }
338      primaryRegion.batchReplay(mutations, 20);
339    } catch (Exception e) {
340      String msg = "Error while replay of batch with multiple nonces. ";
341      LOG.error(msg, e);
342      fail(msg + e.getMessage());
343    }
344  }
345
346  @Test
347  public void testReplayFlushesAndCompactions() throws IOException {
348    // initiate a secondary region with some data.
349
350    // load some data to primary and flush. 3 flushes and some more unflushed data
351    putDataWithFlushes(primaryRegion, 100, 300, 100);
352
353    // compaction from primary
354    LOG.info("-- Compacting primary, only 1 store");
355    primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE);
356
357    // now replay the edits and the flush marker
358    reader = createWALReaderForPrimary();
359
360    LOG.info("-- Replaying edits and flush events in secondary");
361    int lastReplayed = 0;
362    int expectedStoreFileCount = 0;
363    while (true) {
364      WAL.Entry entry = reader.next();
365      if (entry == null) {
366        break;
367      }
368      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
369      CompactionDescriptor compactionDesc =
370        WALEdit.getCompaction(entry.getEdit().getCells().get(0));
371      if (flushDesc != null) {
372        // first verify that everything is replayed and visible before flush event replay
373        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
374        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
375        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
376        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
377        MemStoreSize mss = store.getFlushableSize();
378        long storeSize = store.getSize();
379        long storeSizeUncompressed = store.getStoreSizeUncompressed();
380        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
381          LOG.info("-- Replaying flush start in secondary");
382          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
383          assertNull(result.result);
384          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
385
386          // assert that the store memstore is smaller now
387          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
388          LOG.info("Memstore size reduced by:"
389            + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
390          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
391
392        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
393          LOG.info("-- Replaying flush commit in secondary");
394          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
395
396          // assert that the flush files are picked
397          expectedStoreFileCount++;
398          for (HStore s : secondaryRegion.getStores()) {
399            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
400          }
401          MemStoreSize newMss = store.getFlushableSize();
402          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
403
404          // assert that the region memstore is smaller now
405          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
406          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
407
408          // assert that the store sizes are bigger
409          assertTrue(store.getSize() > storeSize);
410          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
411          assertEquals(store.getSize(), store.getStorefilesSize());
412        }
413        // after replay verify that everything is still visible
414        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
415      } else if (compactionDesc != null) {
416        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
417
418        // assert that the compaction is applied
419        for (HStore store : secondaryRegion.getStores()) {
420          if (store.getColumnFamilyName().equals("cf1")) {
421            assertEquals(1, store.getStorefilesCount());
422          } else {
423            assertEquals(expectedStoreFileCount, store.getStorefilesCount());
424          }
425        }
426      } else {
427        lastReplayed = replayEdit(secondaryRegion, entry);
428        ;
429      }
430    }
431
432    assertEquals(400 - 1, lastReplayed);
433    LOG.info("-- Verifying edits from secondary");
434    verifyData(secondaryRegion, 0, 400, cq, families);
435
436    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
437    verifyData(primaryRegion, 0, lastReplayed, cq, families);
438    for (HStore store : primaryRegion.getStores()) {
439      if (store.getColumnFamilyName().equals("cf1")) {
440        assertEquals(1, store.getStorefilesCount());
441      } else {
442        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
443      }
444    }
445  }
446
447  /**
448   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
449   * equal to, greater or less than the previous flush start marker.
450   */
451  @Test
452  public void testReplayFlushStartMarkers() throws IOException {
453    // load some data to primary and flush. 1 flush and some more unflushed data
454    putDataWithFlushes(primaryRegion, 100, 100, 100);
455    int numRows = 200;
456
457    // now replay the edits and the flush marker
458    reader = createWALReaderForPrimary();
459
460    LOG.info("-- Replaying edits and flush events in secondary");
461
462    FlushDescriptor startFlushDesc = null;
463
464    int lastReplayed = 0;
465    while (true) {
466      WAL.Entry entry = reader.next();
467      if (entry == null) {
468        break;
469      }
470      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
471      if (flushDesc != null) {
472        // first verify that everything is replayed and visible before flush event replay
473        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
474        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
475        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
476        MemStoreSize mss = store.getFlushableSize();
477
478        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
479          startFlushDesc = flushDesc;
480          LOG.info("-- Replaying flush start in secondary");
481          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
482          assertNull(result.result);
483          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
484          assertTrue(regionMemstoreSize > 0);
485          assertTrue(mss.getHeapSize() > 0);
486
487          // assert that the store memstore is smaller now
488          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
489          LOG.info("Memstore size reduced by:"
490            + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
491          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
492          verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
493
494        }
495        // after replay verify that everything is still visible
496        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
497      } else {
498        lastReplayed = replayEdit(secondaryRegion, entry);
499      }
500    }
501
502    // at this point, there should be some data (rows 0-100) in memstore snapshot
503    // and some more data in memstores (rows 100-200)
504
505    verifyData(secondaryRegion, 0, numRows, cq, families);
506
507    // Test case 1: replay the same flush start marker again
508    LOG.info("-- Replaying same flush start in secondary again");
509    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
510    assertNull(result); // this should return null. Ignoring the flush start marker
511    // assert that we still have prepared flush with the previous setup.
512    assertNotNull(secondaryRegion.getPrepareFlushResult());
513    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
514      startFlushDesc.getFlushSequenceNumber());
515    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
516    verifyData(secondaryRegion, 0, numRows, cq, families);
517
518    // Test case 2: replay a flush start marker with a smaller seqId
519    FlushDescriptor startFlushDescSmallerSeqId =
520      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
521    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
522    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
523    assertNull(result); // this should return null. Ignoring the flush start marker
524    // assert that we still have prepared flush with the previous setup.
525    assertNotNull(secondaryRegion.getPrepareFlushResult());
526    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
527      startFlushDesc.getFlushSequenceNumber());
528    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
529    verifyData(secondaryRegion, 0, numRows, cq, families);
530
531    // Test case 3: replay a flush start marker with a larger seqId
532    FlushDescriptor startFlushDescLargerSeqId =
533      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
534    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
535    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
536    assertNull(result); // this should return null. Ignoring the flush start marker
537    // assert that we still have prepared flush with the previous setup.
538    assertNotNull(secondaryRegion.getPrepareFlushResult());
539    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
540      startFlushDesc.getFlushSequenceNumber());
541    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
542    verifyData(secondaryRegion, 0, numRows, cq, families);
543
544    LOG.info("-- Verifying edits from secondary");
545    verifyData(secondaryRegion, 0, numRows, cq, families);
546
547    LOG.info("-- Verifying edits from primary.");
548    verifyData(primaryRegion, 0, numRows, cq, families);
549  }
550
551  /**
552   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
553   * less than the previous flush start marker.
554   */
555  @Test
556  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
557    // load some data to primary and flush. 2 flushes and some more unflushed data
558    putDataWithFlushes(primaryRegion, 100, 200, 100);
559    int numRows = 300;
560
561    // now replay the edits and the flush marker
562    reader = createWALReaderForPrimary();
563
564    LOG.info("-- Replaying edits and flush events in secondary");
565    FlushDescriptor startFlushDesc = null;
566    FlushDescriptor commitFlushDesc = null;
567
568    int lastReplayed = 0;
569    while (true) {
570      System.out.println(lastReplayed);
571      WAL.Entry entry = reader.next();
572      if (entry == null) {
573        break;
574      }
575      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
576      if (flushDesc != null) {
577        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
578          // don't replay the first flush start marker, hold on to it, replay the second one
579          if (startFlushDesc == null) {
580            startFlushDesc = flushDesc;
581          } else {
582            LOG.info("-- Replaying flush start in secondary");
583            startFlushDesc = flushDesc;
584            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
585            assertNull(result.result);
586          }
587        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
588          // do not replay any flush commit yet
589          if (commitFlushDesc == null) {
590            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
591          }
592        }
593        // after replay verify that everything is still visible
594        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
595      } else {
596        lastReplayed = replayEdit(secondaryRegion, entry);
597      }
598    }
599
600    // at this point, there should be some data (rows 0-200) in memstore snapshot
601    // and some more data in memstores (rows 200-300)
602    verifyData(secondaryRegion, 0, numRows, cq, families);
603
604    // no store files in the region
605    int expectedStoreFileCount = 0;
606    for (HStore s : secondaryRegion.getStores()) {
607      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
608    }
609    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
610
611    // Test case 1: replay the a flush commit marker smaller than what we have prepared
612    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
613      + startFlushDesc);
614    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
615
616    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
617    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
618
619    // assert that the flush files are picked
620    expectedStoreFileCount++;
621    for (HStore s : secondaryRegion.getStores()) {
622      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
623    }
624    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
625    MemStoreSize mss = store.getFlushableSize();
626    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
627
628    // assert that the region memstore is same as before
629    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
630    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
631
632    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
633
634    LOG.info("-- Verifying edits from secondary");
635    verifyData(secondaryRegion, 0, numRows, cq, families);
636
637    LOG.info("-- Verifying edits from primary.");
638    verifyData(primaryRegion, 0, numRows, cq, families);
639  }
640
641  /**
642   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
643   * larger than the previous flush start marker.
644   */
645  @Test
646  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
647    // load some data to primary and flush. 1 flush and some more unflushed data
648    putDataWithFlushes(primaryRegion, 100, 100, 100);
649    int numRows = 200;
650
651    // now replay the edits and the flush marker
652    reader = createWALReaderForPrimary();
653
654    LOG.info("-- Replaying edits and flush events in secondary");
655    FlushDescriptor startFlushDesc = null;
656    FlushDescriptor commitFlushDesc = null;
657
658    int lastReplayed = 0;
659    while (true) {
660      WAL.Entry entry = reader.next();
661      if (entry == null) {
662        break;
663      }
664      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
665      if (flushDesc != null) {
666        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
667          if (startFlushDesc == null) {
668            LOG.info("-- Replaying flush start in secondary");
669            startFlushDesc = flushDesc;
670            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
671            assertNull(result.result);
672          }
673        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
674          // do not replay any flush commit yet
675          // hold on to the flush commit marker but simulate a larger
676          // flush commit seqId
677          commitFlushDesc = FlushDescriptor.newBuilder(flushDesc)
678            .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build();
679        }
680        // after replay verify that everything is still visible
681        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
682      } else {
683        lastReplayed = replayEdit(secondaryRegion, entry);
684      }
685    }
686
687    // at this point, there should be some data (rows 0-100) in memstore snapshot
688    // and some more data in memstores (rows 100-200)
689    verifyData(secondaryRegion, 0, numRows, cq, families);
690
691    // no store files in the region
692    int expectedStoreFileCount = 0;
693    for (HStore s : secondaryRegion.getStores()) {
694      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
695    }
696    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
697
698    // Test case 1: replay the a flush commit marker larger than what we have prepared
699    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
700      + startFlushDesc);
701    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
702
703    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
704    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
705
706    // assert that the flush files are picked
707    expectedStoreFileCount++;
708    for (HStore s : secondaryRegion.getStores()) {
709      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
710    }
711    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
712    MemStoreSize mss = store.getFlushableSize();
713    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
714
715    // assert that the region memstore is smaller than before, but not empty
716    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
717    assertTrue(newRegionMemstoreSize > 0);
718    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
719
720    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
721
722    LOG.info("-- Verifying edits from secondary");
723    verifyData(secondaryRegion, 0, numRows, cq, families);
724
725    LOG.info("-- Verifying edits from primary.");
726    verifyData(primaryRegion, 0, numRows, cq, families);
727  }
728
729  /**
730   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
731   * memstore edits should be dropped after the flush commit replay since they should be in flushed
732   * files
733   */
734  @Test
735  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
736    throws IOException {
737    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
738  }
739
740  /**
741   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
742   * memstore edits should be not dropped after the flush commit replay since not every edit will be
743   * in flushed files (based on seqId)
744   */
745  @Test
746  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
747    throws IOException {
748    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
749  }
750
751  /**
752   * Tests the case where we receive a flush commit before receiving any flush prepare markers
753   */
754  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
755    throws IOException {
756    // load some data to primary and flush. 1 flushes and some more unflushed data.
757    // write more data after flush depending on whether droppableSnapshot
758    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
759    int numRows = droppableMemstore ? 100 : 200;
760
761    // now replay the edits and the flush marker
762    reader = createWALReaderForPrimary();
763
764    LOG.info("-- Replaying edits and flush events in secondary");
765    FlushDescriptor commitFlushDesc = null;
766
767    int lastReplayed = 0;
768    while (true) {
769      WAL.Entry entry = reader.next();
770      if (entry == null) {
771        break;
772      }
773      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
774      if (flushDesc != null) {
775        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
776          // do not replay flush start marker
777        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
778          commitFlushDesc = flushDesc; // hold on to the flush commit marker
779        }
780        // after replay verify that everything is still visible
781        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
782      } else {
783        lastReplayed = replayEdit(secondaryRegion, entry);
784      }
785    }
786
787    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
788    // and some more data in memstores (rows 100-300)
789    verifyData(secondaryRegion, 0, numRows, cq, families);
790
791    // no store files in the region
792    int expectedStoreFileCount = 0;
793    for (HStore s : secondaryRegion.getStores()) {
794      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
795    }
796    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
797
798    // Test case 1: replay a flush commit marker without start flush marker
799    assertNull(secondaryRegion.getPrepareFlushResult());
800    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
801
802    // ensure all files are visible in secondary
803    for (HStore store : secondaryRegion.getStores()) {
804      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
805    }
806
807    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
808    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
809
810    // assert that the flush files are picked
811    expectedStoreFileCount++;
812    for (HStore s : secondaryRegion.getStores()) {
813      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
814    }
815    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
816    MemStoreSize mss = store.getFlushableSize();
817    if (droppableMemstore) {
818      // assert that the memstore is dropped
819      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
820    } else {
821      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
822    }
823
824    // assert that the region memstore is same as before (we could not drop)
825    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
826    if (droppableMemstore) {
827      assertTrue(0 == newRegionMemstoreSize);
828    } else {
829      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
830    }
831
832    LOG.info("-- Verifying edits from secondary");
833    verifyData(secondaryRegion, 0, numRows, cq, families);
834
835    LOG.info("-- Verifying edits from primary.");
836    verifyData(primaryRegion, 0, numRows, cq, families);
837  }
838
839  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
840    return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build();
841  }
842
843  /**
844   * Tests replaying region open markers from primary region. Checks whether the files are picked up
845   */
846  @Test
847  public void testReplayRegionOpenEvent() throws IOException {
848    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
849    int numRows = 100;
850
851    // close the region and open again.
852    primaryRegion.close();
853    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
854
855    // now replay the edits and the flush marker
856    reader = createWALReaderForPrimary();
857    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
858
859    LOG.info("-- Replaying edits and region events in secondary");
860    while (true) {
861      WAL.Entry entry = reader.next();
862      if (entry == null) {
863        break;
864      }
865      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
866      RegionEventDescriptor regionEventDesc =
867        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
868
869      if (flushDesc != null) {
870        // don't replay flush events
871      } else if (regionEventDesc != null) {
872        regionEvents.add(regionEventDesc);
873      } else {
874        // don't replay edits
875      }
876    }
877
878    // we should have 1 open, 1 close and 1 open event
879    assertEquals(3, regionEvents.size());
880
881    // replay the first region open event.
882    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
883
884    // replay the close event as well
885    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
886
887    // no store files in the region
888    int expectedStoreFileCount = 0;
889    for (HStore s : secondaryRegion.getStores()) {
890      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
891    }
892    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
893    assertTrue(regionMemstoreSize == 0);
894
895    // now replay the region open event that should contain new file locations
896    LOG.info("Testing replaying region open event " + regionEvents.get(2));
897    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
898
899    // assert that the flush files are picked
900    expectedStoreFileCount++;
901    for (HStore s : secondaryRegion.getStores()) {
902      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
903    }
904    Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
905    MemStoreSize mss = store.getFlushableSize();
906    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
907
908    // assert that the region memstore is empty
909    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
910    assertTrue(newRegionMemstoreSize == 0);
911
912    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
913                                                         // any
914
915    LOG.info("-- Verifying edits from secondary");
916    verifyData(secondaryRegion, 0, numRows, cq, families);
917
918    LOG.info("-- Verifying edits from primary.");
919    verifyData(primaryRegion, 0, numRows, cq, families);
920  }
921
922  /**
923   * Tests the case where we replay a region open event after a flush start but before receiving
924   * flush commit
925   */
926  @Test
927  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
928    putDataWithFlushes(primaryRegion, 100, 100, 100);
929    int numRows = 200;
930
931    // close the region and open again.
932    primaryRegion.close();
933    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
934
935    // now replay the edits and the flush marker
936    reader = createWALReaderForPrimary();
937    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
938
939    LOG.info("-- Replaying edits and region events in secondary");
940    while (true) {
941      WAL.Entry entry = reader.next();
942      if (entry == null) {
943        break;
944      }
945      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
946      RegionEventDescriptor regionEventDesc =
947        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
948
949      if (flushDesc != null) {
950        // only replay flush start
951        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
952          secondaryRegion.replayWALFlushStartMarker(flushDesc);
953        }
954      } else if (regionEventDesc != null) {
955        regionEvents.add(regionEventDesc);
956      } else {
957        replayEdit(secondaryRegion, entry);
958      }
959    }
960
961    // at this point, there should be some data (rows 0-100) in the memstore snapshot
962    // and some more data in memstores (rows 100-200)
963    verifyData(secondaryRegion, 0, numRows, cq, families);
964
965    // we should have 1 open, 1 close and 1 open event
966    assertEquals(3, regionEvents.size());
967
968    // no store files in the region
969    int expectedStoreFileCount = 0;
970    for (HStore s : secondaryRegion.getStores()) {
971      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
972    }
973
974    // now replay the region open event that should contain new file locations
975    LOG.info("Testing replaying region open event " + regionEvents.get(2));
976    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
977
978    // assert that the flush files are picked
979    expectedStoreFileCount = 2; // two flushes happened
980    for (HStore s : secondaryRegion.getStores()) {
981      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
982    }
983    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
984    MemStoreSize newSnapshotSize = store.getSnapshotSize();
985    assertTrue(newSnapshotSize.getDataSize() == 0);
986
987    // assert that the region memstore is empty
988    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
989    assertTrue(newRegionMemstoreSize == 0);
990
991    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
992                                                         // any
993
994    LOG.info("-- Verifying edits from secondary");
995    verifyData(secondaryRegion, 0, numRows, cq, families);
996
997    LOG.info("-- Verifying edits from primary.");
998    verifyData(primaryRegion, 0, numRows, cq, families);
999  }
1000
1001  /**
1002   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1003   * of the last replayed region open event.
1004   */
1005  @Test
1006  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1007    putDataWithFlushes(primaryRegion, 100, 100, 0);
1008    int numRows = 100;
1009
1010    // close the region and open again.
1011    primaryRegion.close();
1012    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1013
1014    // now replay the edits and the flush marker
1015    reader = createWALReaderForPrimary();
1016    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1017    List<WAL.Entry> edits = Lists.newArrayList();
1018
1019    LOG.info("-- Replaying edits and region events in secondary");
1020    while (true) {
1021      WAL.Entry entry = reader.next();
1022      if (entry == null) {
1023        break;
1024      }
1025      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1026      RegionEventDescriptor regionEventDesc =
1027        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1028
1029      if (flushDesc != null) {
1030        // don't replay flushes
1031      } else if (regionEventDesc != null) {
1032        regionEvents.add(regionEventDesc);
1033      } else {
1034        edits.add(entry);
1035      }
1036    }
1037
1038    // replay the region open of first open, but with the seqid of the second open
1039    // this way non of the flush files will be picked up.
1040    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0))
1041      .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build());
1042
1043    // replay edits from the before region close. If replay does not
1044    // skip these the following verification will NOT fail.
1045    for (WAL.Entry entry : edits) {
1046      replayEdit(secondaryRegion, entry);
1047    }
1048
1049    boolean expectedFail = false;
1050    try {
1051      verifyData(secondaryRegion, 0, numRows, cq, families);
1052    } catch (AssertionError e) {
1053      expectedFail = true; // expected
1054    }
1055    if (!expectedFail) {
1056      fail("Should have failed this verification");
1057    }
1058  }
1059
1060  @Test
1061  public void testReplayFlushSeqIds() throws IOException {
1062    // load some data to primary and flush
1063    int start = 0;
1064    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
1065    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1066    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1067    primaryRegion.flush(true);
1068
1069    // now replay the flush marker
1070    reader = createWALReaderForPrimary();
1071
1072    long flushSeqId = -1;
1073    LOG.info("-- Replaying flush events in secondary");
1074    while (true) {
1075      WAL.Entry entry = reader.next();
1076      if (entry == null) {
1077        break;
1078      }
1079      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1080      if (flushDesc != null) {
1081        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1082          LOG.info("-- Replaying flush start in secondary");
1083          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1084          flushSeqId = flushDesc.getFlushSequenceNumber();
1085        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1086          LOG.info("-- Replaying flush commit in secondary");
1087          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1088          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1089        }
1090      }
1091      // else do not replay
1092    }
1093
1094    // TODO: what to do with this?
1095    // assert that the newly picked up flush file is visible
1096    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1097    assertEquals(flushSeqId, readPoint);
1098
1099    // after replay verify that everything is still visible
1100    verifyData(secondaryRegion, 0, 100, cq, families);
1101  }
1102
1103  @Test
1104  public void testSeqIdsFromReplay() throws IOException {
1105    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1106    // original seqIds and they are made visible through mvcc read point upon replay
1107    String method = name.getMethodName();
1108    byte[] tableName = Bytes.toBytes(method);
1109    byte[] family = Bytes.toBytes("family");
1110
1111    HRegion region = initHRegion(tableName, method, family);
1112    try {
1113      // replay an entry that is bigger than current read point
1114      long readPoint = region.getMVCC().getReadPoint();
1115      long origSeqId = readPoint + 100;
1116
1117      Put put = new Put(row).addColumn(family, row, row);
1118      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1119      replay(region, put, origSeqId);
1120
1121      // read point should have advanced to this seqId
1122      assertGet(region, family, row);
1123
1124      // region seqId should have advanced at least to this seqId
1125      assertEquals(origSeqId, region.getReadPoint(null));
1126
1127      // replay an entry that is smaller than current read point
1128      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1129      // replay does not allow reads while replay is going on.
1130      put = new Put(row2).addColumn(family, row2, row2);
1131      put.setDurability(Durability.SKIP_WAL);
1132      replay(region, put, origSeqId - 50);
1133
1134      assertGet(region, family, row2);
1135    } finally {
1136      region.close();
1137    }
1138  }
1139
1140  /**
1141   * Tests that a region opened in secondary mode would not write region open / close events to its
1142   * WAL. n
1143   */
1144  @Test
1145  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1146    secondaryRegion.close();
1147    walSecondary = spy(walSecondary);
1148
1149    // test for region open and close
1150    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1151    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1152      any(WALEdit.class));
1153
1154    // test for replay prepare flush
1155    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1156    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder()
1157      .setFlushSequenceNumber(10)
1158      .setTableName(UnsafeByteOperations
1159        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1160      .setAction(FlushAction.START_FLUSH)
1161      .setEncodedRegionName(
1162        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1163      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1164      .build());
1165
1166    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1167      any(WALEdit.class));
1168
1169    secondaryRegion.close();
1170    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1171      any(WALEdit.class));
1172  }
1173
1174  /**
1175   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1176   */
1177  @Test
1178  public void testRegionReadsEnabledFlag() throws IOException {
1179
1180    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1181
1182    verifyData(secondaryRegion, 0, 100, cq, families);
1183
1184    // now disable reads
1185    secondaryRegion.setReadsEnabled(false);
1186    try {
1187      verifyData(secondaryRegion, 0, 100, cq, families);
1188      fail("Should have failed with IOException");
1189    } catch (IOException ex) {
1190      // expected
1191    }
1192
1193    // verify that we can still replay data
1194    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1195
1196    // now enable reads again
1197    secondaryRegion.setReadsEnabled(true);
1198    verifyData(secondaryRegion, 0, 200, cq, families);
1199  }
1200
1201  /**
1202   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1203   * It should write the flush request marker instead.
1204   */
1205  @Test
1206  public void testWriteFlushRequestMarker() throws IOException {
1207    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1208    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1209    assertNotNull(result);
1210    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1211    assertFalse(result.wroteFlushWalMarker);
1212
1213    // request flush again, but this time with writeFlushRequestWalMarker = true
1214    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1215    assertNotNull(result);
1216    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1217    assertTrue(result.wroteFlushWalMarker);
1218
1219    List<FlushDescriptor> flushes = Lists.newArrayList();
1220    reader = createWALReaderForPrimary();
1221    while (true) {
1222      WAL.Entry entry = reader.next();
1223      if (entry == null) {
1224        break;
1225      }
1226      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1227      if (flush != null) {
1228        flushes.add(flush);
1229      }
1230    }
1231
1232    assertEquals(1, flushes.size());
1233    assertNotNull(flushes.get(0));
1234    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1235  }
1236
1237  /**
1238   * Test the case where the secondary region replica is not in reads enabled state because it is
1239   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush
1240   * marker entry should restore the reads enabled status in the region and allow the reads to
1241   * continue.
1242   */
1243  @Test
1244  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1245    disableReads(secondaryRegion);
1246
1247    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1248    // triggered flush restores readsEnabled
1249    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1250    reader = createWALReaderForPrimary();
1251    while (true) {
1252      WAL.Entry entry = reader.next();
1253      if (entry == null) {
1254        break;
1255      }
1256      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1257      if (flush != null) {
1258        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1259      }
1260    }
1261
1262    // now reads should be enabled
1263    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1264  }
1265
1266  /**
1267   * Test the case where the secondary region replica is not in reads enabled state because it is
1268   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1269   * entries should restore the reads enabled status in the region and allow the reads to continue.
1270   */
1271  @Test
1272  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1273    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1274    // from triggered flush restores readsEnabled
1275    disableReads(secondaryRegion);
1276
1277    // put some data in primary
1278    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1279    primaryRegion.flush(true);
1280    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1281    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1282    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1283    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1284    // but can't figure it... and this is only test that seems to suffer this flush issue.
1285    // St.Ack 20160201
1286    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1287
1288    reader = createWALReaderForPrimary();
1289    while (true) {
1290      WAL.Entry entry = reader.next();
1291      LOG.info(Objects.toString(entry));
1292      if (entry == null) {
1293        break;
1294      }
1295      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1296      if (flush != null) {
1297        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1298      } else {
1299        replayEdit(secondaryRegion, entry);
1300      }
1301    }
1302
1303    // now reads should be enabled
1304    verifyData(secondaryRegion, 0, 100, cq, families);
1305  }
1306
1307  /**
1308   * Test the case where the secondary region replica is not in reads enabled state because it is
1309   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1310   * entries should restore the reads enabled status in the region and allow the reads to continue.
1311   */
1312  @Test
1313  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1314    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1315    // from triggered flush restores readsEnabled
1316    disableReads(secondaryRegion);
1317
1318    // put some data in primary
1319    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1320    primaryRegion.flush(true);
1321
1322    reader = createWALReaderForPrimary();
1323    while (true) {
1324      WAL.Entry entry = reader.next();
1325      if (entry == null) {
1326        break;
1327      }
1328      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1329      if (flush != null) {
1330        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1331      }
1332    }
1333
1334    // now reads should be enabled
1335    verifyData(secondaryRegion, 0, 100, cq, families);
1336  }
1337
1338  /**
1339   * Test the case where the secondary region replica is not in reads enabled state because it is
1340   * waiting for a flush or region open marker from primary region. Replaying region open event
1341   * entry from primary should restore the reads enabled status in the region and allow the reads to
1342   * continue.
1343   */
1344  @Test
1345  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1346    // Test case 3: Test that replaying region open event markers restores readsEnabled
1347    disableReads(secondaryRegion);
1348
1349    primaryRegion.close();
1350    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1351
1352    reader = createWALReaderForPrimary();
1353    while (true) {
1354      WAL.Entry entry = reader.next();
1355      if (entry == null) {
1356        break;
1357      }
1358
1359      RegionEventDescriptor regionEventDesc =
1360        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1361
1362      if (regionEventDesc != null) {
1363        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1364      }
1365    }
1366
1367    // now reads should be enabled
1368    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1369  }
1370
1371  @Test
1372  public void testRefresStoreFiles() throws IOException {
1373    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1374    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1375
1376    // Test case 1: refresh with an empty region
1377    secondaryRegion.refreshStoreFiles();
1378    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1379
1380    // do one flush
1381    putDataWithFlushes(primaryRegion, 100, 100, 0);
1382    int numRows = 100;
1383
1384    // refresh the store file list, and ensure that the files are picked up.
1385    secondaryRegion.refreshStoreFiles();
1386    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1387      secondaryRegion.getStoreFileList(families));
1388    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1389
1390    LOG.info("-- Verifying edits from secondary");
1391    verifyData(secondaryRegion, 0, numRows, cq, families);
1392
1393    // Test case 2: 3 some more flushes
1394    putDataWithFlushes(primaryRegion, 100, 300, 0);
1395    numRows = 300;
1396
1397    // refresh the store file list, and ensure that the files are picked up.
1398    secondaryRegion.refreshStoreFiles();
1399    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1400      secondaryRegion.getStoreFileList(families));
1401    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1402
1403    LOG.info("-- Verifying edits from secondary");
1404    verifyData(secondaryRegion, 0, numRows, cq, families);
1405
1406    if (FSUtils.WINDOWS) {
1407      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1408      return;
1409    }
1410
1411    // Test case 3: compact primary files
1412    primaryRegion.compactStores();
1413    List<HRegion> regions = new ArrayList<>();
1414    regions.add(primaryRegion);
1415    Mockito.doReturn(regions).when(rss).getRegions();
1416    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1417    cleaner.chore();
1418    secondaryRegion.refreshStoreFiles();
1419    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1420      secondaryRegion.getStoreFileList(families));
1421    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1422
1423    LOG.info("-- Verifying edits from secondary");
1424    verifyData(secondaryRegion, 0, numRows, cq, families);
1425
1426    LOG.info("-- Replaying edits in secondary");
1427
1428    // Test case 4: replay some edits, ensure that memstore is dropped.
1429    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1430    putDataWithFlushes(primaryRegion, 400, 400, 0);
1431    numRows = 400;
1432
1433    reader = createWALReaderForPrimary();
1434    while (true) {
1435      WAL.Entry entry = reader.next();
1436      if (entry == null) {
1437        break;
1438      }
1439      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1440      if (flush != null) {
1441        // do not replay flush
1442      } else {
1443        replayEdit(secondaryRegion, entry);
1444      }
1445    }
1446
1447    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1448
1449    secondaryRegion.refreshStoreFiles();
1450
1451    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1452
1453    LOG.info("-- Verifying edits from primary");
1454    verifyData(primaryRegion, 0, numRows, cq, families);
1455    LOG.info("-- Verifying edits from secondary");
1456    verifyData(secondaryRegion, 0, numRows, cq, families);
1457  }
1458
1459  /**
1460   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1461   */
1462  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1463    List<Path> l1 = new ArrayList<>(list1.size());
1464    for (String path : list1) {
1465      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1466    }
1467    List<Path> l2 = new ArrayList<>(list2.size());
1468    for (String path : list2) {
1469      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1470    }
1471    assertEquals(l1, l2);
1472  }
1473
1474  private void disableReads(HRegion region) {
1475    region.setReadsEnabled(false);
1476    try {
1477      verifyData(region, 0, 1, cq, families);
1478      fail("Should have failed with IOException");
1479    } catch (IOException ex) {
1480      // expected
1481    }
1482  }
1483
1484  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1485    put.setDurability(Durability.SKIP_WAL);
1486    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1487    region.batchReplay(new MutationReplay[] { mutation }, replaySeqId);
1488  }
1489
1490  /**
1491   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1492   */
1493  @Test
1494  public void testReplayBulkLoadEvent() throws IOException {
1495    LOG.info("testReplayBulkLoadEvent starts");
1496    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1497
1498    // close the region and open again.
1499    primaryRegion.close();
1500    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1501
1502    // bulk load a file into primary region
1503    byte[] randomValues = new byte[20];
1504    Bytes.random(randomValues);
1505    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1506
1507    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1508    int expectedLoadFileCount = 0;
1509    for (byte[] family : families) {
1510      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1511      expectedLoadFileCount++;
1512    }
1513    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1514
1515    // now replay the edits and the bulk load marker
1516    reader = createWALReaderForPrimary();
1517
1518    LOG.info("-- Replaying edits and region events in secondary");
1519    BulkLoadDescriptor bulkloadEvent = null;
1520    while (true) {
1521      WAL.Entry entry = reader.next();
1522      if (entry == null) {
1523        break;
1524      }
1525      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1526      if (bulkloadEvent != null) {
1527        break;
1528      }
1529    }
1530
1531    // we should have 1 bulk load event
1532    assertTrue(bulkloadEvent != null);
1533    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1534
1535    // replay the bulk load event
1536    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1537
1538    List<String> storeFileName = new ArrayList<>();
1539    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1540      storeFileName.addAll(storeDesc.getStoreFileList());
1541    }
1542    // assert that the bulk loaded files are picked
1543    for (HStore s : secondaryRegion.getStores()) {
1544      for (HStoreFile sf : s.getStorefiles()) {
1545        storeFileName.remove(sf.getPath().getName());
1546      }
1547    }
1548    assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1549
1550    LOG.info("-- Verifying edits from secondary");
1551    for (byte[] family : families) {
1552      assertGet(secondaryRegion, family, randomValues);
1553    }
1554  }
1555
1556  @Test
1557  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1558    // tests replaying flush commit marker, but the flush file has already been compacted
1559    // from primary and also deleted from the archive directory
1560    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder()
1561      .setFlushSequenceNumber(Long.MAX_VALUE)
1562      .setTableName(UnsafeByteOperations
1563        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1564      .setAction(FlushAction.COMMIT_FLUSH)
1565      .setEncodedRegionName(
1566        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1567      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1568      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1569        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1570        .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build())
1571      .build());
1572  }
1573
1574  @Test
1575  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1576    // tests replaying compaction marker, but the compaction output file has already been compacted
1577    // from primary and also deleted from the archive directory
1578    secondaryRegion
1579      .replayWALCompactionMarker(
1580        CompactionDescriptor.newBuilder()
1581          .setTableName(UnsafeByteOperations
1582            .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1583          .setEncodedRegionName(
1584            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1585          .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123")
1586          .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir")
1587          .setRegionName(
1588            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1589          .build(),
1590        true, true, Long.MAX_VALUE);
1591  }
1592
1593  @Test
1594  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1595    // tests replaying region open event marker, but the region files have already been compacted
1596    // from primary and also deleted from the archive directory
1597    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1598      .setTableName(UnsafeByteOperations
1599        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1600      .setEncodedRegionName(
1601        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1602      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1603      .setEventType(EventType.REGION_OPEN)
1604      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1605      .setLogSequenceNumber(Long.MAX_VALUE)
1606      .addStores(
1607        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1608          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1609      .build());
1610  }
1611
1612  @Test
1613  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1614    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1615    // from primary and also deleted from the archive directory
1616    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1617      .setTableName(
1618        ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1619      .setEncodedRegionName(
1620        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1621      .setBulkloadSeqNum(Long.MAX_VALUE)
1622      .addStores(
1623        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1624          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1625      .build());
1626  }
1627
1628  private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes)
1629    throws IOException {
1630    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1631    // TODO We need a way to do this without creating files
1632    Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1633    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1634    try {
1635      hFileFactory.withOutputStream(out);
1636      hFileFactory.withFileContext(new HFileContextBuilder().build());
1637      HFile.Writer writer = hFileFactory.create();
1638      try {
1639        writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L,
1640          KeyValue.Type.Put.getCode(), valueBytes)));
1641      } finally {
1642        writer.close();
1643      }
1644    } finally {
1645      out.close();
1646    }
1647    return testFile.toString();
1648  }
1649
1650  /**
1651   * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush
1652   * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but
1653   * does not execute flush after n
1654   */
1655  private void putDataWithFlushes(HRegion region, int flushInterval, int numRows,
1656    int numRowsAfterFlush) throws IOException {
1657    int start = 0;
1658    for (; start < numRows; start += flushInterval) {
1659      LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval));
1660      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1661      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1662      region.flush(true);
1663    }
1664    LOG.info("-- Writing some more data to primary, not flushing");
1665    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1666  }
1667
1668  private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf,
1669    byte[]... families) throws IOException {
1670    for (int i = startRow; i < startRow + numRows; i++) {
1671      Put put = new Put(Bytes.toBytes("" + i));
1672      put.setDurability(Durability.SKIP_WAL);
1673      for (byte[] family : families) {
1674        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1675      }
1676      replay(region, put, i + 1);
1677    }
1678  }
1679
1680  private static HRegion initHRegion(byte[] tableName, String callingMethod, byte[]... families)
1681    throws IOException {
1682    return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1683      callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1684  }
1685
1686  private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1687    String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1688    byte[]... families) throws IOException {
1689    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1690      isReadOnly, durability, wal, families);
1691  }
1692}