001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.FileNotFoundException;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.Map;
041import java.util.Objects;
042import java.util.Random;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FSDataOutputStream;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtility;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.Durability;
056import org.apache.hadoop.hbase.client.Get;
057import org.apache.hadoop.hbase.client.Put;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.RegionInfoBuilder;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.executor.ExecutorService;
063import org.apache.hadoop.hbase.io.hfile.HFile;
064import org.apache.hadoop.hbase.io.hfile.HFileContext;
065import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
066import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
067import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
072import org.apache.hadoop.hbase.util.FSUtils;
073import org.apache.hadoop.hbase.util.Pair;
074import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
075import org.apache.hadoop.hbase.wal.WAL;
076import org.apache.hadoop.hbase.wal.WALEdit;
077import org.apache.hadoop.hbase.wal.WALFactory;
078import org.apache.hadoop.hbase.wal.WALKeyImpl;
079import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
080import org.apache.hadoop.util.StringUtils;
081import org.junit.After;
082import org.junit.AfterClass;
083import org.junit.Before;
084import org.junit.BeforeClass;
085import org.junit.ClassRule;
086import org.junit.Rule;
087import org.junit.Test;
088import org.junit.experimental.categories.Category;
089import org.junit.rules.TestName;
090import org.mockito.Mockito;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
095import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
107
108/**
109 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
110 * region replicas
111 */
112@Category(LargeTests.class)
113public class TestHRegionReplayEvents {
114
115  @ClassRule
116  public static final HBaseClassTestRule CLASS_RULE =
117      HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
118
119  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
120  @Rule public TestName name = new TestName();
121
122  private static HBaseTestingUtility TEST_UTIL;
123
124  public static Configuration CONF;
125  private String dir;
126
127  private byte[][] families = new byte[][] {
128      Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
129
130  // Test names
131  protected byte[] tableName;
132  protected String method;
133  protected final byte[] row = Bytes.toBytes("rowA");
134  protected final byte[] row2 = Bytes.toBytes("rowB");
135  protected byte[] cq = Bytes.toBytes("cq");
136
137  // per test fields
138  private Path rootDir;
139  private TableDescriptor htd;
140  private RegionServerServices rss;
141  private RegionInfo primaryHri, secondaryHri;
142  private HRegion primaryRegion, secondaryRegion;
143  private WAL walPrimary, walSecondary;
144  private WAL.Reader reader;
145
146  @BeforeClass
147  public static void setUpBeforeClass() throws Exception {
148    TEST_UTIL = new HBaseTestingUtility();
149    TEST_UTIL.startMiniDFSCluster(1);
150  }
151
152  @AfterClass
153  public static void tearDownAfterClass() throws Exception {
154    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
155    TEST_UTIL.cleanupTestDir();
156    TEST_UTIL.shutdownMiniDFSCluster();
157  }
158
159  @Before
160  public void setUp() throws Exception {
161    CONF = TEST_UTIL.getConfiguration();
162    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
163    method = name.getMethodName();
164    tableName = Bytes.toBytes(name.getMethodName());
165    rootDir = new Path(dir + method);
166    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
167    method = name.getMethodName();
168    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
169    for (byte[] family : families) {
170      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
171    }
172    htd = builder.build();
173
174    long time = System.currentTimeMillis();
175    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
176    primaryHri =
177        RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
178    secondaryHri =
179        RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
180
181    WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
182    walPrimary = wals.getWAL(primaryHri);
183    walSecondary = wals.getWAL(secondaryHri);
184
185    rss = mock(RegionServerServices.class);
186    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
187    when(rss.getConfiguration()).thenReturn(CONF);
188    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
189    String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
190        .toString();
191    ExecutorService es = new ExecutorService(string);
192    es.startExecutorService(
193      string+"-"+string, 1);
194    when(rss.getExecutorService()).thenReturn(es);
195    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
196    primaryRegion.close();
197    List<HRegion> regions = new ArrayList<>();
198    regions.add(primaryRegion);
199    Mockito.doReturn(regions).when(rss).getRegions();
200
201    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
202    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
203
204    reader = null;
205  }
206
207  @After
208  public void tearDown() throws Exception {
209    if (reader != null) {
210      reader.close();
211    }
212
213    if (primaryRegion != null) {
214      HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
215    }
216    if (secondaryRegion != null) {
217      HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
218    }
219
220    EnvironmentEdgeManagerTestHelper.reset();
221  }
222
223  String getName() {
224    return name.getMethodName();
225  }
226
227  // Some of the test cases are as follows:
228  // 1. replay flush start marker again
229  // 2. replay flush with smaller seqId than what is there in memstore snapshot
230  // 3. replay flush with larger seqId than what is there in memstore snapshot
231  // 4. replay flush commit without flush prepare. non droppable memstore
232  // 5. replay flush commit without flush prepare. droppable memstore
233  // 6. replay open region event
234  // 7. replay open region event after flush start
235  // 8. replay flush form an earlier seqId (test ignoring seqIds)
236  // 9. start flush does not prevent region from closing.
237
238  @Test
239  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
240    // load some data and flush ensure that the secondary replica will not execute the flush
241
242    // load some data to secondary by replaying
243    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
244
245    verifyData(secondaryRegion, 0, 1000, cq, families);
246
247    // flush region
248    FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
249    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
250
251    verifyData(secondaryRegion, 0, 1000, cq, families);
252
253    // close the region, and inspect that it has not flushed
254    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
255    // assert that there are no files (due to flush)
256    for (List<HStoreFile> f : files.values()) {
257      assertTrue(f.isEmpty());
258    }
259  }
260
261  /**
262   * Tests a case where we replay only a flush start marker, then the region is closed. This region
263   * should not block indefinitely
264   */
265  @Test
266  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
267    // load some data to primary and flush
268    int start = 0;
269    LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
270    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
271    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
272    primaryRegion.flush(true);
273
274    // now replay the edits and the flush marker
275    reader = createWALReaderForPrimary();
276
277    LOG.info("-- Replaying edits and flush events in secondary");
278    while (true) {
279      WAL.Entry entry = reader.next();
280      if (entry == null) {
281        break;
282      }
283      FlushDescriptor flushDesc
284        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
285      if (flushDesc != null) {
286        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
287          LOG.info("-- Replaying flush start in secondary");
288          secondaryRegion.replayWALFlushStartMarker(flushDesc);
289        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
290          LOG.info("-- NOT Replaying flush commit in secondary");
291        }
292      } else {
293        replayEdit(secondaryRegion, entry);
294      }
295    }
296
297    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
298    // now close the region which should not cause hold because of un-committed flush
299    secondaryRegion.close();
300
301    // verify that the memstore size is back to what it was
302    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
303  }
304
305  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
306    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
307      return 0; // handled elsewhere
308    }
309    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
310    for (Cell cell : entry.getEdit().getCells()) put.add(cell);
311    put.setDurability(Durability.SKIP_WAL);
312    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
313    region.batchReplay(new MutationReplay[] {mutation},
314      entry.getKey().getSequenceId());
315    return Integer.parseInt(Bytes.toString(put.getRow()));
316  }
317
318  WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
319    return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
320      AbstractFSWALProvider.getCurrentFileName(walPrimary),
321      TEST_UTIL.getConfiguration());
322  }
323
324  @Test
325  public void testBatchReplayWithMultipleNonces() throws IOException {
326    try {
327      MutationReplay[] mutations = new MutationReplay[100];
328      for (int i = 0; i < 100; i++) {
329        Put put = new Put(Bytes.toBytes(i));
330        put.setDurability(Durability.SYNC_WAL);
331        for (byte[] familly : this.families) {
332          put.addColumn(familly, this.cq, null);
333          long nonceNum = i / 10;
334          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
335        }
336      }
337      primaryRegion.batchReplay(mutations, 20);
338    } catch (Exception e) {
339      String msg = "Error while replay of batch with multiple nonces. ";
340      LOG.error(msg, e);
341      fail(msg + e.getMessage());
342    }
343  }
344
345  @Test
346  public void testReplayFlushesAndCompactions() throws IOException {
347    // initiate a secondary region with some data.
348
349    // load some data to primary and flush. 3 flushes and some more unflushed data
350    putDataWithFlushes(primaryRegion, 100, 300, 100);
351
352    // compaction from primary
353    LOG.info("-- Compacting primary, only 1 store");
354    primaryRegion.compactStore(Bytes.toBytes("cf1"),
355      NoLimitThroughputController.INSTANCE);
356
357    // now replay the edits and the flush marker
358    reader = createWALReaderForPrimary();
359
360    LOG.info("-- Replaying edits and flush events in secondary");
361    int lastReplayed = 0;
362    int expectedStoreFileCount = 0;
363    while (true) {
364      WAL.Entry entry = reader.next();
365      if (entry == null) {
366        break;
367      }
368      FlushDescriptor flushDesc
369      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
370      CompactionDescriptor compactionDesc
371      = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
372      if (flushDesc != null) {
373        // first verify that everything is replayed and visible before flush event replay
374        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
375        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
376        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
377        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
378        MemStoreSize mss = store.getFlushableSize();
379        long storeSize = store.getSize();
380        long storeSizeUncompressed = store.getStoreSizeUncompressed();
381        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
382          LOG.info("-- Replaying flush start in secondary");
383          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
384          assertNull(result.result);
385          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
386
387          // assert that the store memstore is smaller now
388          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
389          LOG.info("Memstore size reduced by:"
390              + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
391          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
392
393        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
394          LOG.info("-- Replaying flush commit in secondary");
395          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
396
397          // assert that the flush files are picked
398          expectedStoreFileCount++;
399          for (HStore s : secondaryRegion.getStores()) {
400            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
401          }
402          MemStoreSize newMss = store.getFlushableSize();
403          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
404
405          // assert that the region memstore is smaller now
406          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
407          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
408
409          // assert that the store sizes are bigger
410          assertTrue(store.getSize() > storeSize);
411          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
412          assertEquals(store.getSize(), store.getStorefilesSize());
413        }
414        // after replay verify that everything is still visible
415        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
416      } else if (compactionDesc != null) {
417        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
418
419        // assert that the compaction is applied
420        for (HStore store : secondaryRegion.getStores()) {
421          if (store.getColumnFamilyName().equals("cf1")) {
422            assertEquals(1, store.getStorefilesCount());
423          } else {
424            assertEquals(expectedStoreFileCount, store.getStorefilesCount());
425          }
426        }
427      } else {
428        lastReplayed = replayEdit(secondaryRegion, entry);;
429      }
430    }
431
432    assertEquals(400-1, lastReplayed);
433    LOG.info("-- Verifying edits from secondary");
434    verifyData(secondaryRegion, 0, 400, cq, families);
435
436    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
437    verifyData(primaryRegion, 0, lastReplayed, cq, families);
438    for (HStore store : primaryRegion.getStores()) {
439      if (store.getColumnFamilyName().equals("cf1")) {
440        assertEquals(1, store.getStorefilesCount());
441      } else {
442        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
443      }
444    }
445  }
446
447  /**
448   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
449   * equal to, greater or less than the previous flush start marker.
450   */
451  @Test
452  public void testReplayFlushStartMarkers() throws IOException {
453    // load some data to primary and flush. 1 flush and some more unflushed data
454    putDataWithFlushes(primaryRegion, 100, 100, 100);
455    int numRows = 200;
456
457    // now replay the edits and the flush marker
458    reader =  createWALReaderForPrimary();
459
460    LOG.info("-- Replaying edits and flush events in secondary");
461
462    FlushDescriptor startFlushDesc = null;
463
464    int lastReplayed = 0;
465    while (true) {
466      WAL.Entry entry = reader.next();
467      if (entry == null) {
468        break;
469      }
470      FlushDescriptor flushDesc
471      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
472      if (flushDesc != null) {
473        // first verify that everything is replayed and visible before flush event replay
474        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
475        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
476        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
477        MemStoreSize mss = store.getFlushableSize();
478
479        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
480          startFlushDesc = flushDesc;
481          LOG.info("-- Replaying flush start in secondary");
482          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
483          assertNull(result.result);
484          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
485          assertTrue(regionMemstoreSize > 0);
486          assertTrue(mss.getHeapSize() > 0);
487
488          // assert that the store memstore is smaller now
489          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
490          LOG.info("Memstore size reduced by:"
491              + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
492          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
493          verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
494
495        }
496        // after replay verify that everything is still visible
497        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
498      } else {
499        lastReplayed = replayEdit(secondaryRegion, entry);
500      }
501    }
502
503    // at this point, there should be some data (rows 0-100) in memstore snapshot
504    // and some more data in memstores (rows 100-200)
505
506    verifyData(secondaryRegion, 0, numRows, cq, families);
507
508    // Test case 1: replay the same flush start marker again
509    LOG.info("-- Replaying same flush start in secondary again");
510    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
511    assertNull(result); // this should return null. Ignoring the flush start marker
512    // assert that we still have prepared flush with the previous setup.
513    assertNotNull(secondaryRegion.getPrepareFlushResult());
514    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
515      startFlushDesc.getFlushSequenceNumber());
516    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
517    verifyData(secondaryRegion, 0, numRows, cq, families);
518
519    // Test case 2: replay a flush start marker with a smaller seqId
520    FlushDescriptor startFlushDescSmallerSeqId
521      = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
522    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
523    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
524    assertNull(result); // this should return null. Ignoring the flush start marker
525    // assert that we still have prepared flush with the previous setup.
526    assertNotNull(secondaryRegion.getPrepareFlushResult());
527    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
528      startFlushDesc.getFlushSequenceNumber());
529    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
530    verifyData(secondaryRegion, 0, numRows, cq, families);
531
532    // Test case 3: replay a flush start marker with a larger seqId
533    FlushDescriptor startFlushDescLargerSeqId
534      = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
535    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
536    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
537    assertNull(result); // this should return null. Ignoring the flush start marker
538    // assert that we still have prepared flush with the previous setup.
539    assertNotNull(secondaryRegion.getPrepareFlushResult());
540    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
541      startFlushDesc.getFlushSequenceNumber());
542    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
543    verifyData(secondaryRegion, 0, numRows, cq, families);
544
545    LOG.info("-- Verifying edits from secondary");
546    verifyData(secondaryRegion, 0, numRows, cq, families);
547
548    LOG.info("-- Verifying edits from primary.");
549    verifyData(primaryRegion, 0, numRows, cq, families);
550  }
551
552  /**
553   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
554   * less than the previous flush start marker.
555   */
556  @Test
557  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
558    // load some data to primary and flush. 2 flushes and some more unflushed data
559    putDataWithFlushes(primaryRegion, 100, 200, 100);
560    int numRows = 300;
561
562    // now replay the edits and the flush marker
563    reader =  createWALReaderForPrimary();
564
565    LOG.info("-- Replaying edits and flush events in secondary");
566    FlushDescriptor startFlushDesc = null;
567    FlushDescriptor commitFlushDesc = null;
568
569    int lastReplayed = 0;
570    while (true) {
571      System.out.println(lastReplayed);
572      WAL.Entry entry = reader.next();
573      if (entry == null) {
574        break;
575      }
576      FlushDescriptor flushDesc
577      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
578      if (flushDesc != null) {
579        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
580          // don't replay the first flush start marker, hold on to it, replay the second one
581          if (startFlushDesc == null) {
582            startFlushDesc = flushDesc;
583          } else {
584            LOG.info("-- Replaying flush start in secondary");
585            startFlushDesc = flushDesc;
586            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
587            assertNull(result.result);
588          }
589        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
590          // do not replay any flush commit yet
591          if (commitFlushDesc == null) {
592            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
593          }
594        }
595        // after replay verify that everything is still visible
596        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
597      } else {
598        lastReplayed = replayEdit(secondaryRegion, entry);
599      }
600    }
601
602    // at this point, there should be some data (rows 0-200) in memstore snapshot
603    // and some more data in memstores (rows 200-300)
604    verifyData(secondaryRegion, 0, numRows, cq, families);
605
606    // no store files in the region
607    int expectedStoreFileCount = 0;
608    for (HStore s : secondaryRegion.getStores()) {
609      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
610    }
611    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
612
613    // Test case 1: replay the a flush commit marker smaller than what we have prepared
614    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
615        + startFlushDesc);
616    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
617
618    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
619    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
620
621    // assert that the flush files are picked
622    expectedStoreFileCount++;
623    for (HStore s : secondaryRegion.getStores()) {
624      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
625    }
626    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
627    MemStoreSize mss = store.getFlushableSize();
628    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
629
630    // assert that the region memstore is same as before
631    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
632    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
633
634    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
635
636    LOG.info("-- Verifying edits from secondary");
637    verifyData(secondaryRegion, 0, numRows, cq, families);
638
639    LOG.info("-- Verifying edits from primary.");
640    verifyData(primaryRegion, 0, numRows, cq, families);
641  }
642
643  /**
644   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
645   * larger than the previous flush start marker.
646   */
647  @Test
648  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
649    // load some data to primary and flush. 1 flush and some more unflushed data
650    putDataWithFlushes(primaryRegion, 100, 100, 100);
651    int numRows = 200;
652
653    // now replay the edits and the flush marker
654    reader =  createWALReaderForPrimary();
655
656    LOG.info("-- Replaying edits and flush events in secondary");
657    FlushDescriptor startFlushDesc = null;
658    FlushDescriptor commitFlushDesc = null;
659
660    int lastReplayed = 0;
661    while (true) {
662      WAL.Entry entry = reader.next();
663      if (entry == null) {
664        break;
665      }
666      FlushDescriptor flushDesc
667      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
668      if (flushDesc != null) {
669        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
670          if (startFlushDesc == null) {
671            LOG.info("-- Replaying flush start in secondary");
672            startFlushDesc = flushDesc;
673            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
674            assertNull(result.result);
675          }
676        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
677          // do not replay any flush commit yet
678          // hold on to the flush commit marker but simulate a larger
679          // flush commit seqId
680          commitFlushDesc =
681              FlushDescriptor.newBuilder(flushDesc)
682              .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
683              .build();
684        }
685        // after replay verify that everything is still visible
686        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
687      } else {
688        lastReplayed = replayEdit(secondaryRegion, entry);
689      }
690    }
691
692    // at this point, there should be some data (rows 0-100) in memstore snapshot
693    // and some more data in memstores (rows 100-200)
694    verifyData(secondaryRegion, 0, numRows, cq, families);
695
696    // no store files in the region
697    int expectedStoreFileCount = 0;
698    for (HStore s : secondaryRegion.getStores()) {
699      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
700    }
701    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
702
703    // Test case 1: replay the a flush commit marker larger than what we have prepared
704    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
705        + startFlushDesc);
706    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
707
708    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
709    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
710
711    // assert that the flush files are picked
712    expectedStoreFileCount++;
713    for (HStore s : secondaryRegion.getStores()) {
714      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
715    }
716    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
717    MemStoreSize mss = store.getFlushableSize();
718    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
719
720    // assert that the region memstore is smaller than before, but not empty
721    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
722    assertTrue(newRegionMemstoreSize > 0);
723    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
724
725    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
726
727    LOG.info("-- Verifying edits from secondary");
728    verifyData(secondaryRegion, 0, numRows, cq, families);
729
730    LOG.info("-- Verifying edits from primary.");
731    verifyData(primaryRegion, 0, numRows, cq, families);
732  }
733
734  /**
735   * Tests the case where we receive a flush commit before receiving any flush prepare markers.
736   * The memstore edits should be dropped after the flush commit replay since they should be in
737   * flushed files
738   */
739  @Test
740  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
741      throws IOException {
742    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
743  }
744
745  /**
746   * Tests the case where we receive a flush commit before receiving any flush prepare markers.
747   * The memstore edits should be not dropped after the flush commit replay since not every edit
748   * will be in flushed files (based on seqId)
749   */
750  @Test
751  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
752      throws IOException {
753    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
754  }
755
756  /**
757   * Tests the case where we receive a flush commit before receiving any flush prepare markers
758   */
759  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
760      throws IOException {
761    // load some data to primary and flush. 1 flushes and some more unflushed data.
762    // write more data after flush depending on whether droppableSnapshot
763    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
764    int numRows = droppableMemstore ? 100 : 200;
765
766    // now replay the edits and the flush marker
767    reader =  createWALReaderForPrimary();
768
769    LOG.info("-- Replaying edits and flush events in secondary");
770    FlushDescriptor commitFlushDesc = null;
771
772    int lastReplayed = 0;
773    while (true) {
774      WAL.Entry entry = reader.next();
775      if (entry == null) {
776        break;
777      }
778      FlushDescriptor flushDesc
779      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
780      if (flushDesc != null) {
781        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
782          // do not replay flush start marker
783        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
784          commitFlushDesc = flushDesc; // hold on to the flush commit marker
785        }
786        // after replay verify that everything is still visible
787        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
788      } else {
789        lastReplayed = replayEdit(secondaryRegion, entry);
790      }
791    }
792
793    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
794    // and some more data in memstores (rows 100-300)
795    verifyData(secondaryRegion, 0, numRows, cq, families);
796
797    // no store files in the region
798    int expectedStoreFileCount = 0;
799    for (HStore s : secondaryRegion.getStores()) {
800      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
801    }
802    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
803
804    // Test case 1: replay a flush commit marker without start flush marker
805    assertNull(secondaryRegion.getPrepareFlushResult());
806    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
807
808    // ensure all files are visible in secondary
809    for (HStore store : secondaryRegion.getStores()) {
810      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
811    }
812
813    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
814    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
815
816    // assert that the flush files are picked
817    expectedStoreFileCount++;
818    for (HStore s : secondaryRegion.getStores()) {
819      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
820    }
821    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
822    MemStoreSize mss = store.getFlushableSize();
823    if (droppableMemstore) {
824      // assert that the memstore is dropped
825      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
826    } else {
827      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
828    }
829
830    // assert that the region memstore is same as before (we could not drop)
831    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
832    if (droppableMemstore) {
833      assertTrue(0 == newRegionMemstoreSize);
834    } else {
835      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
836    }
837
838    LOG.info("-- Verifying edits from secondary");
839    verifyData(secondaryRegion, 0, numRows, cq, families);
840
841    LOG.info("-- Verifying edits from primary.");
842    verifyData(primaryRegion, 0, numRows, cq, families);
843  }
844
845  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
846    return FlushDescriptor.newBuilder(flush)
847        .setFlushSequenceNumber(flushSeqId)
848        .build();
849  }
850
851  /**
852   * Tests replaying region open markers from primary region. Checks whether the files are picked up
853   */
854  @Test
855  public void testReplayRegionOpenEvent() throws IOException {
856    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
857    int numRows = 100;
858
859    // close the region and open again.
860    primaryRegion.close();
861    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
862
863    // now replay the edits and the flush marker
864    reader =  createWALReaderForPrimary();
865    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
866
867    LOG.info("-- Replaying edits and region events in secondary");
868    while (true) {
869      WAL.Entry entry = reader.next();
870      if (entry == null) {
871        break;
872      }
873      FlushDescriptor flushDesc
874        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
875      RegionEventDescriptor regionEventDesc
876        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
877
878      if (flushDesc != null) {
879        // don't replay flush events
880      } else if (regionEventDesc != null) {
881        regionEvents.add(regionEventDesc);
882      } else {
883        // don't replay edits
884      }
885    }
886
887    // we should have 1 open, 1 close and 1 open event
888    assertEquals(3, regionEvents.size());
889
890    // replay the first region open event.
891    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
892
893    // replay the close event as well
894    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
895
896    // no store files in the region
897    int expectedStoreFileCount = 0;
898    for (HStore s : secondaryRegion.getStores()) {
899      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
900    }
901    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
902    assertTrue(regionMemstoreSize == 0);
903
904    // now replay the region open event that should contain new file locations
905    LOG.info("Testing replaying region open event " + regionEvents.get(2));
906    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
907
908    // assert that the flush files are picked
909    expectedStoreFileCount++;
910    for (HStore s : secondaryRegion.getStores()) {
911      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
912    }
913    Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
914    MemStoreSize mss = store.getFlushableSize();
915    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
916
917    // assert that the region memstore is empty
918    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
919    assertTrue(newRegionMemstoreSize == 0);
920
921    assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
922
923    LOG.info("-- Verifying edits from secondary");
924    verifyData(secondaryRegion, 0, numRows, cq, families);
925
926    LOG.info("-- Verifying edits from primary.");
927    verifyData(primaryRegion, 0, numRows, cq, families);
928  }
929
930  /**
931   * Tests the case where we replay a region open event after a flush start but before receiving
932   * flush commit
933   */
934  @Test
935  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
936    putDataWithFlushes(primaryRegion, 100, 100, 100);
937    int numRows = 200;
938
939    // close the region and open again.
940    primaryRegion.close();
941    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
942
943    // now replay the edits and the flush marker
944    reader =  createWALReaderForPrimary();
945    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
946
947    LOG.info("-- Replaying edits and region events in secondary");
948    while (true) {
949      WAL.Entry entry = reader.next();
950      if (entry == null) {
951        break;
952      }
953      FlushDescriptor flushDesc
954        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
955      RegionEventDescriptor regionEventDesc
956        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
957
958      if (flushDesc != null) {
959        // only replay flush start
960        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
961          secondaryRegion.replayWALFlushStartMarker(flushDesc);
962        }
963      } else if (regionEventDesc != null) {
964        regionEvents.add(regionEventDesc);
965      } else {
966        replayEdit(secondaryRegion, entry);
967      }
968    }
969
970    // at this point, there should be some data (rows 0-100) in the memstore snapshot
971    // and some more data in memstores (rows 100-200)
972    verifyData(secondaryRegion, 0, numRows, cq, families);
973
974    // we should have 1 open, 1 close and 1 open event
975    assertEquals(3, regionEvents.size());
976
977    // no store files in the region
978    int expectedStoreFileCount = 0;
979    for (HStore s : secondaryRegion.getStores()) {
980      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
981    }
982
983    // now replay the region open event that should contain new file locations
984    LOG.info("Testing replaying region open event " + regionEvents.get(2));
985    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
986
987    // assert that the flush files are picked
988    expectedStoreFileCount = 2; // two flushes happened
989    for (HStore s : secondaryRegion.getStores()) {
990      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
991    }
992    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
993    MemStoreSize newSnapshotSize = store.getSnapshotSize();
994    assertTrue(newSnapshotSize.getDataSize() == 0);
995
996    // assert that the region memstore is empty
997    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
998    assertTrue(newRegionMemstoreSize == 0);
999
1000    assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
1001
1002    LOG.info("-- Verifying edits from secondary");
1003    verifyData(secondaryRegion, 0, numRows, cq, families);
1004
1005    LOG.info("-- Verifying edits from primary.");
1006    verifyData(primaryRegion, 0, numRows, cq, families);
1007  }
1008
1009  /**
1010   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1011   * of the last replayed region open event.
1012   */
1013  @Test
1014  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1015    putDataWithFlushes(primaryRegion, 100, 100, 0);
1016    int numRows = 100;
1017
1018    // close the region and open again.
1019    primaryRegion.close();
1020    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1021
1022    // now replay the edits and the flush marker
1023    reader =  createWALReaderForPrimary();
1024    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1025    List<WAL.Entry> edits = Lists.newArrayList();
1026
1027    LOG.info("-- Replaying edits and region events in secondary");
1028    while (true) {
1029      WAL.Entry entry = reader.next();
1030      if (entry == null) {
1031        break;
1032      }
1033      FlushDescriptor flushDesc
1034        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1035      RegionEventDescriptor regionEventDesc
1036        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1037
1038      if (flushDesc != null) {
1039        // don't replay flushes
1040      } else if (regionEventDesc != null) {
1041        regionEvents.add(regionEventDesc);
1042      } else {
1043        edits.add(entry);
1044      }
1045    }
1046
1047    // replay the region open of first open, but with the seqid of the second open
1048    // this way non of the flush files will be picked up.
1049    secondaryRegion.replayWALRegionEventMarker(
1050      RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1051        regionEvents.get(2).getLogSequenceNumber()).build());
1052
1053
1054    // replay edits from the before region close. If replay does not
1055    // skip these the following verification will NOT fail.
1056    for (WAL.Entry entry: edits) {
1057      replayEdit(secondaryRegion, entry);
1058    }
1059
1060    boolean expectedFail = false;
1061    try {
1062      verifyData(secondaryRegion, 0, numRows, cq, families);
1063    } catch (AssertionError e) {
1064      expectedFail = true; // expected
1065    }
1066    if (!expectedFail) {
1067      fail("Should have failed this verification");
1068    }
1069  }
1070
1071  @Test
1072  public void testReplayFlushSeqIds() throws IOException {
1073    // load some data to primary and flush
1074    int start = 0;
1075    LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
1076    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1077    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1078    primaryRegion.flush(true);
1079
1080    // now replay the flush marker
1081    reader =  createWALReaderForPrimary();
1082
1083    long flushSeqId = -1;
1084    LOG.info("-- Replaying flush events in secondary");
1085    while (true) {
1086      WAL.Entry entry = reader.next();
1087      if (entry == null) {
1088        break;
1089      }
1090      FlushDescriptor flushDesc
1091        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1092      if (flushDesc != null) {
1093        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1094          LOG.info("-- Replaying flush start in secondary");
1095          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1096          flushSeqId = flushDesc.getFlushSequenceNumber();
1097        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1098          LOG.info("-- Replaying flush commit in secondary");
1099          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1100          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1101        }
1102      }
1103      // else do not replay
1104    }
1105
1106    // TODO: what to do with this?
1107    // assert that the newly picked up flush file is visible
1108    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1109    assertEquals(flushSeqId, readPoint);
1110
1111    // after replay verify that everything is still visible
1112    verifyData(secondaryRegion, 0, 100, cq, families);
1113  }
1114
1115  @Test
1116  public void testSeqIdsFromReplay() throws IOException {
1117    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1118    // original seqIds and they are made visible through mvcc read point upon replay
1119    String method = name.getMethodName();
1120    byte[] tableName = Bytes.toBytes(method);
1121    byte[] family = Bytes.toBytes("family");
1122
1123    HRegion region = initHRegion(tableName, method, family);
1124    try {
1125      // replay an entry that is bigger than current read point
1126      long readPoint = region.getMVCC().getReadPoint();
1127      long origSeqId = readPoint + 100;
1128
1129      Put put = new Put(row).addColumn(family, row, row);
1130      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1131      replay(region, put, origSeqId);
1132
1133      // read point should have advanced to this seqId
1134      assertGet(region, family, row);
1135
1136      // region seqId should have advanced at least to this seqId
1137      assertEquals(origSeqId, region.getReadPoint(null));
1138
1139      // replay an entry that is smaller than current read point
1140      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1141      // replay does not allow reads while replay is going on.
1142      put = new Put(row2).addColumn(family, row2, row2);
1143      put.setDurability(Durability.SKIP_WAL);
1144      replay(region, put, origSeqId - 50);
1145
1146      assertGet(region, family, row2);
1147    } finally {
1148      region.close();
1149    }
1150  }
1151
1152  /**
1153   * Tests that a region opened in secondary mode would not write region open / close
1154   * events to its WAL.
1155   * @throws IOException
1156   */
1157  @Test
1158  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1159    secondaryRegion.close();
1160    walSecondary = spy(walSecondary);
1161
1162    // test for region open and close
1163    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1164    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1165      any(WALEdit.class));
1166
1167    // test for replay prepare flush
1168    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1169    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1170      setFlushSequenceNumber(10)
1171      .setTableName(UnsafeByteOperations.unsafeWrap(
1172          primaryRegion.getTableDescriptor().getTableName().getName()))
1173      .setAction(FlushAction.START_FLUSH)
1174      .setEncodedRegionName(
1175          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1176      .setRegionName(UnsafeByteOperations.unsafeWrap(
1177          primaryRegion.getRegionInfo().getRegionName()))
1178      .build());
1179
1180    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1181      any(WALEdit.class));
1182
1183    secondaryRegion.close();
1184    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1185      any(WALEdit.class));
1186  }
1187
1188  /**
1189   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1190   */
1191  @Test
1192  public void testRegionReadsEnabledFlag() throws IOException {
1193
1194    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1195
1196    verifyData(secondaryRegion, 0, 100, cq, families);
1197
1198    // now disable reads
1199    secondaryRegion.setReadsEnabled(false);
1200    try {
1201      verifyData(secondaryRegion, 0, 100, cq, families);
1202      fail("Should have failed with IOException");
1203    } catch(IOException ex) {
1204      // expected
1205    }
1206
1207    // verify that we can still replay data
1208    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1209
1210    // now enable reads again
1211    secondaryRegion.setReadsEnabled(true);
1212    verifyData(secondaryRegion, 0, 200, cq, families);
1213  }
1214
1215  /**
1216   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1217   * It should write the flush request marker instead.
1218   */
1219  @Test
1220  public void testWriteFlushRequestMarker() throws IOException {
1221    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1222    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1223    assertNotNull(result);
1224    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1225    assertFalse(result.wroteFlushWalMarker);
1226
1227    // request flush again, but this time with writeFlushRequestWalMarker = true
1228    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1229    assertNotNull(result);
1230    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1231    assertTrue(result.wroteFlushWalMarker);
1232
1233    List<FlushDescriptor> flushes = Lists.newArrayList();
1234    reader = createWALReaderForPrimary();
1235    while (true) {
1236      WAL.Entry entry = reader.next();
1237      if (entry == null) {
1238        break;
1239      }
1240      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1241      if (flush != null) {
1242        flushes.add(flush);
1243      }
1244    }
1245
1246    assertEquals(1, flushes.size());
1247    assertNotNull(flushes.get(0));
1248    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1249  }
1250
1251  /**
1252   * Test the case where the secondary region replica is not in reads enabled state because it is
1253   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1254   * flush marker entry should restore the reads enabled status in the region and allow the reads
1255   * to continue.
1256   */
1257  @Test
1258  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1259    disableReads(secondaryRegion);
1260
1261    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1262    // triggered flush restores readsEnabled
1263    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1264    reader = createWALReaderForPrimary();
1265    while (true) {
1266      WAL.Entry entry = reader.next();
1267      if (entry == null) {
1268        break;
1269      }
1270      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1271      if (flush != null) {
1272        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1273      }
1274    }
1275
1276    // now reads should be enabled
1277    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1278  }
1279
1280  /**
1281   * Test the case where the secondary region replica is not in reads enabled state because it is
1282   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1283   * entries should restore the reads enabled status in the region and allow the reads
1284   * to continue.
1285   */
1286  @Test
1287  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1288    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1289    // from triggered flush restores readsEnabled
1290    disableReads(secondaryRegion);
1291
1292    // put some data in primary
1293    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1294    primaryRegion.flush(true);
1295    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1296    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1297    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1298    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1299    // but can't figure it... and this is only test that seems to suffer this flush issue.
1300    // St.Ack 20160201
1301    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1302
1303    reader = createWALReaderForPrimary();
1304    while (true) {
1305      WAL.Entry entry = reader.next();
1306      LOG.info(Objects.toString(entry));
1307      if (entry == null) {
1308        break;
1309      }
1310      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1311      if (flush != null) {
1312        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1313      } else {
1314        replayEdit(secondaryRegion, entry);
1315      }
1316    }
1317
1318    // now reads should be enabled
1319    verifyData(secondaryRegion, 0, 100, cq, families);
1320  }
1321
1322  /**
1323   * Test the case where the secondary region replica is not in reads enabled state because it is
1324   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1325   * entries should restore the reads enabled status in the region and allow the reads
1326   * to continue.
1327   */
1328  @Test
1329  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1330    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1331    // from triggered flush restores readsEnabled
1332    disableReads(secondaryRegion);
1333
1334    // put some data in primary
1335    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1336    primaryRegion.flush(true);
1337
1338    reader = createWALReaderForPrimary();
1339    while (true) {
1340      WAL.Entry entry = reader.next();
1341      if (entry == null) {
1342        break;
1343      }
1344      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1345      if (flush != null) {
1346        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1347      }
1348    }
1349
1350    // now reads should be enabled
1351    verifyData(secondaryRegion, 0, 100, cq, families);
1352  }
1353
1354  /**
1355   * Test the case where the secondary region replica is not in reads enabled state because it is
1356   * waiting for a flush or region open marker from primary region. Replaying region open event
1357   * entry from primary should restore the reads enabled status in the region and allow the reads
1358   * to continue.
1359   */
1360  @Test
1361  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1362    // Test case 3: Test that replaying region open event markers restores readsEnabled
1363    disableReads(secondaryRegion);
1364
1365    primaryRegion.close();
1366    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1367
1368    reader = createWALReaderForPrimary();
1369    while (true) {
1370      WAL.Entry entry = reader.next();
1371      if (entry == null) {
1372        break;
1373      }
1374
1375      RegionEventDescriptor regionEventDesc
1376        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1377
1378      if (regionEventDesc != null) {
1379        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1380      }
1381    }
1382
1383    // now reads should be enabled
1384    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1385  }
1386
1387  @Test
1388  public void testRefresStoreFiles() throws IOException {
1389    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1390    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1391
1392    // Test case 1: refresh with an empty region
1393    secondaryRegion.refreshStoreFiles();
1394    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1395
1396    // do one flush
1397    putDataWithFlushes(primaryRegion, 100, 100, 0);
1398    int numRows = 100;
1399
1400    // refresh the store file list, and ensure that the files are picked up.
1401    secondaryRegion.refreshStoreFiles();
1402    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1403      secondaryRegion.getStoreFileList(families));
1404    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1405
1406    LOG.info("-- Verifying edits from secondary");
1407    verifyData(secondaryRegion, 0, numRows, cq, families);
1408
1409    // Test case 2: 3 some more flushes
1410    putDataWithFlushes(primaryRegion, 100, 300, 0);
1411    numRows = 300;
1412
1413    // refresh the store file list, and ensure that the files are picked up.
1414    secondaryRegion.refreshStoreFiles();
1415    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1416      secondaryRegion.getStoreFileList(families));
1417    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1418
1419    LOG.info("-- Verifying edits from secondary");
1420    verifyData(secondaryRegion, 0, numRows, cq, families);
1421
1422    if (FSUtils.WINDOWS) {
1423      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1424      return;
1425    }
1426
1427    // Test case 3: compact primary files
1428    primaryRegion.compactStores();
1429    List<HRegion> regions = new ArrayList<>();
1430    regions.add(primaryRegion);
1431    Mockito.doReturn(regions).when(rss).getRegions();
1432    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1433    cleaner.chore();
1434    secondaryRegion.refreshStoreFiles();
1435    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1436      secondaryRegion.getStoreFileList(families));
1437    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1438
1439    LOG.info("-- Verifying edits from secondary");
1440    verifyData(secondaryRegion, 0, numRows, cq, families);
1441
1442    LOG.info("-- Replaying edits in secondary");
1443
1444    // Test case 4: replay some edits, ensure that memstore is dropped.
1445    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1446    putDataWithFlushes(primaryRegion, 400, 400, 0);
1447    numRows = 400;
1448
1449    reader =  createWALReaderForPrimary();
1450    while (true) {
1451      WAL.Entry entry = reader.next();
1452      if (entry == null) {
1453        break;
1454      }
1455      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1456      if (flush != null) {
1457        // do not replay flush
1458      } else {
1459        replayEdit(secondaryRegion, entry);
1460      }
1461    }
1462
1463    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1464
1465    secondaryRegion.refreshStoreFiles();
1466
1467    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1468
1469    LOG.info("-- Verifying edits from primary");
1470    verifyData(primaryRegion, 0, numRows, cq, families);
1471    LOG.info("-- Verifying edits from secondary");
1472    verifyData(secondaryRegion, 0, numRows, cq, families);
1473  }
1474
1475  /**
1476   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1477   */
1478  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1479    List<Path> l1 = new ArrayList<>(list1.size());
1480    for (String path : list1) {
1481      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1482    }
1483    List<Path> l2 = new ArrayList<>(list2.size());
1484    for (String path : list2) {
1485      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1486    }
1487    assertEquals(l1, l2);
1488  }
1489
1490  private void disableReads(HRegion region) {
1491    region.setReadsEnabled(false);
1492    try {
1493      verifyData(region, 0, 1, cq, families);
1494      fail("Should have failed with IOException");
1495    } catch(IOException ex) {
1496      // expected
1497    }
1498  }
1499
1500  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1501    put.setDurability(Durability.SKIP_WAL);
1502    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1503    region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1504  }
1505
1506  /**
1507   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1508   */
1509  @Test
1510  public void testReplayBulkLoadEvent() throws IOException {
1511    LOG.info("testReplayBulkLoadEvent starts");
1512    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1513
1514    // close the region and open again.
1515    primaryRegion.close();
1516    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1517
1518    // bulk load a file into primary region
1519    Random random = new Random();
1520    byte[] randomValues = new byte[20];
1521    random.nextBytes(randomValues);
1522    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1523
1524    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1525    int expectedLoadFileCount = 0;
1526    for (byte[] family : families) {
1527      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1528      expectedLoadFileCount++;
1529    }
1530    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1531
1532    // now replay the edits and the bulk load marker
1533    reader = createWALReaderForPrimary();
1534
1535    LOG.info("-- Replaying edits and region events in secondary");
1536    BulkLoadDescriptor bulkloadEvent = null;
1537    while (true) {
1538      WAL.Entry entry = reader.next();
1539      if (entry == null) {
1540        break;
1541      }
1542      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1543      if (bulkloadEvent != null) {
1544        break;
1545      }
1546    }
1547
1548    // we should have 1 bulk load event
1549    assertTrue(bulkloadEvent != null);
1550    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1551
1552    // replay the bulk load event
1553    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1554
1555
1556    List<String> storeFileName = new ArrayList<>();
1557    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1558      storeFileName.addAll(storeDesc.getStoreFileList());
1559    }
1560    // assert that the bulk loaded files are picked
1561    for (HStore s : secondaryRegion.getStores()) {
1562      for (HStoreFile sf : s.getStorefiles()) {
1563        storeFileName.remove(sf.getPath().getName());
1564      }
1565    }
1566    assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1567
1568    LOG.info("-- Verifying edits from secondary");
1569    for (byte[] family : families) {
1570      assertGet(secondaryRegion, family, randomValues);
1571    }
1572  }
1573
1574  @Test
1575  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1576    // tests replaying flush commit marker, but the flush file has already been compacted
1577    // from primary and also deleted from the archive directory
1578    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1579      setFlushSequenceNumber(Long.MAX_VALUE)
1580      .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1581      .setAction(FlushAction.COMMIT_FLUSH)
1582      .setEncodedRegionName(
1583          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1584      .setRegionName(UnsafeByteOperations.unsafeWrap(
1585          primaryRegion.getRegionInfo().getRegionName()))
1586      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1587        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1588        .setStoreHomeDir("/store_home_dir")
1589        .addFlushOutput("/foo/baz/123")
1590        .build())
1591      .build());
1592  }
1593
1594  @Test
1595  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1596    // tests replaying compaction marker, but the compaction output file has already been compacted
1597    // from primary and also deleted from the archive directory
1598    secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1599      .setTableName(UnsafeByteOperations.unsafeWrap(
1600          primaryRegion.getTableDescriptor().getTableName().getName()))
1601      .setEncodedRegionName(
1602          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1603      .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1604      .addCompactionInput("/123")
1605      .addCompactionOutput("/456")
1606      .setStoreHomeDir("/store_home_dir")
1607      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1608      .build()
1609      , true, true, Long.MAX_VALUE);
1610  }
1611
1612  @Test
1613  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1614    // tests replaying region open event marker, but the region files have already been compacted
1615    // from primary and also deleted from the archive directory
1616    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1617      .setTableName(UnsafeByteOperations.unsafeWrap(
1618          primaryRegion.getTableDescriptor().getTableName().getName()))
1619      .setEncodedRegionName(
1620          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1621      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1622      .setEventType(EventType.REGION_OPEN)
1623      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1624      .setLogSequenceNumber(Long.MAX_VALUE)
1625      .addStores(StoreDescriptor.newBuilder()
1626        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1627        .setStoreHomeDir("/store_home_dir")
1628        .addStoreFile("/123")
1629        .build())
1630      .build());
1631  }
1632
1633  @Test
1634  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1635    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1636    // from primary and also deleted from the archive directory
1637    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1638      .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1639      .setEncodedRegionName(
1640          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1641      .setBulkloadSeqNum(Long.MAX_VALUE)
1642      .addStores(StoreDescriptor.newBuilder()
1643        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1644        .setStoreHomeDir("/store_home_dir")
1645        .addStoreFile("/123")
1646        .build())
1647      .build());
1648  }
1649
1650  private String createHFileForFamilies(Path testPath, byte[] family,
1651      byte[] valueBytes) throws IOException {
1652    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1653    // TODO We need a way to do this without creating files
1654    Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1655    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1656    try {
1657      hFileFactory.withOutputStream(out);
1658      hFileFactory.withFileContext(new HFileContext());
1659      HFile.Writer writer = hFileFactory.create();
1660      try {
1661        writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L,
1662          KeyValue.Type.Put.getCode(), valueBytes)));
1663      } finally {
1664        writer.close();
1665      }
1666    } finally {
1667      out.close();
1668    }
1669    return testFile.toString();
1670  }
1671
1672  /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1673   * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1674   * more rows but does not execute flush after
1675   * @throws IOException */
1676  private void putDataWithFlushes(HRegion region, int flushInterval,
1677      int numRows, int numRowsAfterFlush) throws IOException {
1678    int start = 0;
1679    for (; start < numRows; start += flushInterval) {
1680      LOG.info("-- Writing some data to primary from " +  start + " to " + (start+flushInterval));
1681      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1682      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1683      region.flush(true);
1684    }
1685    LOG.info("-- Writing some more data to primary, not flushing");
1686    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1687  }
1688
1689  private void putDataByReplay(HRegion region,
1690      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1691    for (int i = startRow; i < startRow + numRows; i++) {
1692      Put put = new Put(Bytes.toBytes("" + i));
1693      put.setDurability(Durability.SKIP_WAL);
1694      for (byte[] family : families) {
1695        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1696      }
1697      replay(region, put, i+1);
1698    }
1699  }
1700
1701  private static HRegion initHRegion(byte[] tableName,
1702      String callingMethod, byte[]... families) throws IOException {
1703    return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1704      callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1705  }
1706
1707  private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1708      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1709      WAL wal, byte[]... families) throws IOException {
1710    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1711      isReadOnly, durability, wal, families);
1712  }
1713}