001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertNotNull;
026import static org.junit.jupiter.api.Assertions.assertNull;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028import static org.junit.jupiter.api.Assertions.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.FileNotFoundException;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.Map;
041import java.util.Objects;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellBuilderType;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
049import org.apache.hadoop.hbase.HBaseTestingUtil;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.Durability;
056import org.apache.hadoop.hbase.client.Get;
057import org.apache.hadoop.hbase.client.Put;
058import org.apache.hadoop.hbase.client.RegionInfo;
059import org.apache.hadoop.hbase.client.RegionInfoBuilder;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.executor.ExecutorService;
063import org.apache.hadoop.hbase.executor.ExecutorType;
064import org.apache.hadoop.hbase.io.hfile.HFile;
065import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
066import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
067import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
068import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
069import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
070import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
071import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
072import org.apache.hadoop.hbase.testclassification.LargeTests;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
076import org.apache.hadoop.hbase.util.FSUtils;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.hbase.util.Strings;
079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
080import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
081import org.apache.hadoop.hbase.wal.WAL;
082import org.apache.hadoop.hbase.wal.WALEdit;
083import org.apache.hadoop.hbase.wal.WALFactory;
084import org.apache.hadoop.hbase.wal.WALKeyImpl;
085import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
086import org.apache.hadoop.hbase.wal.WALStreamReader;
087import org.junit.jupiter.api.AfterAll;
088import org.junit.jupiter.api.AfterEach;
089import org.junit.jupiter.api.BeforeAll;
090import org.junit.jupiter.api.BeforeEach;
091import org.junit.jupiter.api.Tag;
092import org.junit.jupiter.api.Test;
093import org.junit.jupiter.api.TestInfo;
094import org.mockito.Mockito;
095import org.slf4j.Logger;
096import org.slf4j.LoggerFactory;
097
098import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
099import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
100
101import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
111
112/**
113 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
114 * region replicas
115 */
116@SuppressWarnings("deprecation")
117@Tag(LargeTests.TAG)
118public class TestHRegionReplayEvents {
119
120  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionReplayEvents.class);
121  private String name;
122
123  private static HBaseTestingUtil TEST_UTIL;
124
125  public static Configuration CONF;
126  private String dir;
127
128  private byte[][] families =
129    new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") };
130
131  // Test names
132  protected byte[] tableName;
133  protected String method;
134  protected final byte[] row = Bytes.toBytes("rowA");
135  protected final byte[] row2 = Bytes.toBytes("rowB");
136  protected byte[] cq = Bytes.toBytes("cq");
137
138  // per test fields
139  private Path rootDir;
140  private TableDescriptor htd;
141  private RegionServerServices rss;
142  private RegionInfo primaryHri, secondaryHri;
143  private HRegion primaryRegion, secondaryRegion;
144  private WAL walPrimary, walSecondary;
145  private WALStreamReader reader;
146
147  @BeforeAll
148  public static void setUpBeforeClass() throws Exception {
149    TEST_UTIL = new HBaseTestingUtil();
150    TEST_UTIL.startMiniDFSCluster(1);
151  }
152
153  @AfterAll
154  public static void tearDownAfterClass() throws Exception {
155    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
156    TEST_UTIL.cleanupTestDir();
157    TEST_UTIL.shutdownMiniDFSCluster();
158  }
159
160  @BeforeEach
161  public void setUp(TestInfo testInfo) throws Exception {
162    this.name = testInfo.getTestMethod().get().getName();
163    CONF = TEST_UTIL.getConfiguration();
164    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
165    method = name;
166    tableName = Bytes.toBytes(name);
167    rootDir = new Path(dir + method);
168    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
169    method = name;
170    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
171    for (byte[] family : families) {
172      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
173    }
174    htd = builder.build();
175
176    long time = EnvironmentEdgeManager.currentTime();
177    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
178      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
179    primaryHri =
180      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
181    secondaryHri =
182      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
183
184    WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
185    walPrimary = wals.getWAL(primaryHri);
186    walSecondary = wals.getWAL(secondaryHri);
187
188    rss = mock(RegionServerServices.class);
189    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
190    when(rss.getConfiguration()).thenReturn(CONF);
191    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
192    when(rss.getRegionServerSpaceQuotaManager()).thenReturn(null); // or mock it properly
193    when(rss.getFlushRequester()).thenReturn(mock(FlushRequester.class));
194    when(rss.getCompactionRequestor()).thenReturn(mock(CompactionRequester.class));
195    when(rss.getMetrics()).thenReturn(mock(MetricsRegionServer.class));
196    String string =
197      org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString();
198    ExecutorService es = new ExecutorService(string);
199    es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1)
200      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
201    when(rss.getExecutorService()).thenReturn(es);
202    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
203    primaryRegion.close();
204    List<HRegion> regions = new ArrayList<>();
205    regions.add(primaryRegion);
206    Mockito.doReturn(regions).when(rss).getRegions();
207
208    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
209    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
210
211    reader = null;
212  }
213
214  @AfterEach
215  public void tearDown() throws Exception {
216    if (reader != null) {
217      reader.close();
218    }
219
220    if (primaryRegion != null) {
221      HBaseTestingUtil.closeRegionAndWAL(primaryRegion);
222    }
223    if (secondaryRegion != null) {
224      HBaseTestingUtil.closeRegionAndWAL(secondaryRegion);
225    }
226
227    EnvironmentEdgeManagerTestHelper.reset();
228  }
229
230  String getName() {
231    return name;
232  }
233
234  // Some of the test cases are as follows:
235  // 1. replay flush start marker again
236  // 2. replay flush with smaller seqId than what is there in memstore snapshot
237  // 3. replay flush with larger seqId than what is there in memstore snapshot
238  // 4. replay flush commit without flush prepare. non droppable memstore
239  // 5. replay flush commit without flush prepare. droppable memstore
240  // 6. replay open region event
241  // 7. replay open region event after flush start
242  // 8. replay flush form an earlier seqId (test ignoring seqIds)
243  // 9. start flush does not prevent region from closing.
244
245  @Test
246  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
247    // load some data and flush ensure that the secondary replica will not execute the flush
248
249    // load some data to secondary by replaying
250    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
251
252    verifyData(secondaryRegion, 0, 1000, cq, families);
253
254    // flush region
255    FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true);
256    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
257
258    verifyData(secondaryRegion, 0, 1000, cq, families);
259
260    // close the region, and inspect that it has not flushed
261    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
262    // assert that there are no files (due to flush)
263    for (List<HStoreFile> f : files.values()) {
264      assertTrue(f.isEmpty());
265    }
266  }
267
268  /**
269   * Tests a case where we replay only a flush start marker, then the region is closed. This region
270   * should not block indefinitely
271   */
272  @Test
273  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
274    // load some data to primary and flush
275    int start = 0;
276    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
277    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
278    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
279    primaryRegion.flush(true);
280
281    // now replay the edits and the flush marker
282    reader = createWALReaderForPrimary();
283
284    LOG.info("-- Replaying edits and flush events in secondary");
285    while (true) {
286      WAL.Entry entry = reader.next();
287      if (entry == null) {
288        break;
289      }
290      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
291      if (flushDesc != null) {
292        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
293          LOG.info("-- Replaying flush start in secondary");
294          secondaryRegion.replayWALFlushStartMarker(flushDesc);
295        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
296          LOG.info("-- NOT Replaying flush commit in secondary");
297        }
298      } else {
299        replayEdit(secondaryRegion, entry);
300      }
301    }
302
303    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
304    // now close the region which should not cause hold because of un-committed flush
305    secondaryRegion.close();
306
307    // verify that the memstore size is back to what it was
308    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
309  }
310
311  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
312    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
313      return 0; // handled elsewhere
314    }
315    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
316    for (Cell cell : entry.getEdit().getCells())
317      put.add(cell);
318    put.setDurability(Durability.SKIP_WAL);
319    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
320    region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId());
321    return Integer.parseInt(Bytes.toString(put.getRow()));
322  }
323
324  private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException {
325    return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(),
326      AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration());
327  }
328
329  @Test
330  public void testBatchReplayWithMultipleNonces() throws IOException {
331    try {
332      MutationReplay[] mutations = new MutationReplay[100];
333      for (int i = 0; i < 100; i++) {
334        Put put = new Put(Bytes.toBytes(i));
335        put.setDurability(Durability.SYNC_WAL);
336        for (byte[] familly : this.families) {
337          put.addColumn(familly, this.cq, null);
338          long nonceNum = i / 10;
339          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
340        }
341      }
342      primaryRegion.batchReplay(mutations, 20);
343    } catch (Exception e) {
344      String msg = "Error while replay of batch with multiple nonces. ";
345      LOG.error(msg, e);
346      fail(msg + e.getMessage());
347    }
348  }
349
350  @Test
351  public void testReplayFlushesAndCompactions() throws IOException {
352    // initiate a secondary region with some data.
353
354    // load some data to primary and flush. 3 flushes and some more unflushed data
355    putDataWithFlushes(primaryRegion, 100, 300, 100);
356
357    // compaction from primary
358    LOG.info("-- Compacting primary, only 1 store");
359    primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE);
360
361    // now replay the edits and the flush marker
362    reader = createWALReaderForPrimary();
363
364    LOG.info("-- Replaying edits and flush events in secondary");
365    int lastReplayed = 0;
366    int expectedStoreFileCount = 0;
367    while (true) {
368      WAL.Entry entry = reader.next();
369      if (entry == null) {
370        break;
371      }
372      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
373      CompactionDescriptor compactionDesc =
374        WALEdit.getCompaction(entry.getEdit().getCells().get(0));
375      if (flushDesc != null) {
376        // first verify that everything is replayed and visible before flush event replay
377        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
378        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
379        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
380        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
381        MemStoreSize mss = store.getFlushableSize();
382        long storeSize = store.getSize();
383        long storeSizeUncompressed = store.getStoreSizeUncompressed();
384        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
385          LOG.info("-- Replaying flush start in secondary");
386          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
387          assertNull(result.result);
388          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
389
390          // assert that the store memstore is smaller now
391          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
392          LOG.info("Memstore size reduced by:"
393            + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
394          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
395
396        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
397          LOG.info("-- Replaying flush commit in secondary");
398          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
399
400          // assert that the flush files are picked
401          expectedStoreFileCount++;
402          for (HStore s : secondaryRegion.getStores()) {
403            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
404          }
405          MemStoreSize newMss = store.getFlushableSize();
406          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
407
408          // assert that the region memstore is smaller now
409          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
410          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
411
412          // assert that the store sizes are bigger
413          assertTrue(store.getSize() > storeSize);
414          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
415          assertEquals(store.getSize(), store.getStorefilesSize());
416        }
417        // after replay verify that everything is still visible
418        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
419      } else if (compactionDesc != null) {
420        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
421
422        // assert that the compaction is applied
423        for (HStore store : secondaryRegion.getStores()) {
424          StoreFileTracker sft =
425            StoreFileTrackerFactory.create(CONF, false, store.getStoreContext());
426          if (store.getColumnFamilyName().equals("cf1")) {
427            assertEquals(1, store.getStorefilesCount());
428          } else {
429            assertEquals(expectedStoreFileCount, sft.load().size());
430          }
431        }
432      } else {
433        lastReplayed = replayEdit(secondaryRegion, entry);
434      }
435    }
436
437    assertEquals(400 - 1, lastReplayed);
438    LOG.info("-- Verifying edits from secondary");
439    verifyData(secondaryRegion, 0, 400, cq, families);
440
441    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
442    verifyData(primaryRegion, 0, lastReplayed, cq, families);
443    for (HStore store : primaryRegion.getStores()) {
444      if (store.getColumnFamilyName().equals("cf1")) {
445        assertEquals(1, store.getStorefilesCount());
446      } else {
447        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
448      }
449    }
450  }
451
452  /**
453   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
454   * equal to, greater or less than the previous flush start marker.
455   */
456  @Test
457  public void testReplayFlushStartMarkers() throws IOException {
458    // load some data to primary and flush. 1 flush and some more unflushed data
459    putDataWithFlushes(primaryRegion, 100, 100, 100);
460    int numRows = 200;
461
462    // now replay the edits and the flush marker
463    reader = createWALReaderForPrimary();
464
465    LOG.info("-- Replaying edits and flush events in secondary");
466
467    FlushDescriptor startFlushDesc = null;
468
469    int lastReplayed = 0;
470    while (true) {
471      WAL.Entry entry = reader.next();
472      if (entry == null) {
473        break;
474      }
475      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
476      if (flushDesc != null) {
477        // first verify that everything is replayed and visible before flush event replay
478        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
479        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
480        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
481        MemStoreSize mss = store.getFlushableSize();
482
483        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
484          startFlushDesc = flushDesc;
485          LOG.info("-- Replaying flush start in secondary");
486          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
487          assertNull(result.result);
488          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
489          assertTrue(regionMemstoreSize > 0);
490          assertTrue(mss.getHeapSize() > 0);
491
492          // assert that the store memstore is smaller now
493          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
494          LOG.info("Memstore size reduced by:"
495            + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
496          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
497          verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
498
499        }
500        // after replay verify that everything is still visible
501        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
502      } else {
503        lastReplayed = replayEdit(secondaryRegion, entry);
504      }
505    }
506
507    // at this point, there should be some data (rows 0-100) in memstore snapshot
508    // and some more data in memstores (rows 100-200)
509
510    verifyData(secondaryRegion, 0, numRows, cq, families);
511
512    // Test case 1: replay the same flush start marker again
513    LOG.info("-- Replaying same flush start in secondary again");
514    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
515    assertNull(result); // this should return null. Ignoring the flush start marker
516    // assert that we still have prepared flush with the previous setup.
517    assertNotNull(secondaryRegion.getPrepareFlushResult());
518    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
519      startFlushDesc.getFlushSequenceNumber());
520    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
521    verifyData(secondaryRegion, 0, numRows, cq, families);
522
523    // Test case 2: replay a flush start marker with a smaller seqId
524    FlushDescriptor startFlushDescSmallerSeqId =
525      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
526    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
527    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
528    assertNull(result); // this should return null. Ignoring the flush start marker
529    // assert that we still have prepared flush with the previous setup.
530    assertNotNull(secondaryRegion.getPrepareFlushResult());
531    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
532      startFlushDesc.getFlushSequenceNumber());
533    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
534    verifyData(secondaryRegion, 0, numRows, cq, families);
535
536    // Test case 3: replay a flush start marker with a larger seqId
537    FlushDescriptor startFlushDescLargerSeqId =
538      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
539    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
540    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
541    assertNull(result); // this should return null. Ignoring the flush start marker
542    // assert that we still have prepared flush with the previous setup.
543    assertNotNull(secondaryRegion.getPrepareFlushResult());
544    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
545      startFlushDesc.getFlushSequenceNumber());
546    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
547    verifyData(secondaryRegion, 0, numRows, cq, families);
548
549    LOG.info("-- Verifying edits from secondary");
550    verifyData(secondaryRegion, 0, numRows, cq, families);
551
552    LOG.info("-- Verifying edits from primary.");
553    verifyData(primaryRegion, 0, numRows, cq, families);
554  }
555
556  /**
557   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
558   * less than the previous flush start marker.
559   */
560  @Test
561  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
562    // load some data to primary and flush. 2 flushes and some more unflushed data
563    putDataWithFlushes(primaryRegion, 100, 200, 100);
564    int numRows = 300;
565
566    // now replay the edits and the flush marker
567    reader = createWALReaderForPrimary();
568
569    LOG.info("-- Replaying edits and flush events in secondary");
570    FlushDescriptor startFlushDesc = null;
571    FlushDescriptor commitFlushDesc = null;
572
573    int lastReplayed = 0;
574    while (true) {
575      System.out.println(lastReplayed);
576      WAL.Entry entry = reader.next();
577      if (entry == null) {
578        break;
579      }
580      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
581      if (flushDesc != null) {
582        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
583          // don't replay the first flush start marker, hold on to it, replay the second one
584          if (startFlushDesc == null) {
585            startFlushDesc = flushDesc;
586          } else {
587            LOG.info("-- Replaying flush start in secondary");
588            startFlushDesc = flushDesc;
589            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
590            assertNull(result.result);
591          }
592        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
593          // do not replay any flush commit yet
594          if (commitFlushDesc == null) {
595            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
596          }
597        }
598        // after replay verify that everything is still visible
599        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
600      } else {
601        lastReplayed = replayEdit(secondaryRegion, entry);
602      }
603    }
604
605    // at this point, there should be some data (rows 0-200) in memstore snapshot
606    // and some more data in memstores (rows 200-300)
607    verifyData(secondaryRegion, 0, numRows, cq, families);
608
609    // no store files in the region
610    int expectedStoreFileCount = 0;
611    for (HStore s : secondaryRegion.getStores()) {
612      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
613    }
614    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
615
616    // Test case 1: replay the a flush commit marker smaller than what we have prepared
617    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
618      + startFlushDesc);
619    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
620
621    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
622    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
623
624    // assert that the flush files are picked
625    expectedStoreFileCount++;
626    for (HStore s : secondaryRegion.getStores()) {
627      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
628    }
629    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
630    MemStoreSize mss = store.getFlushableSize();
631    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
632
633    // assert that the region memstore is same as before
634    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
635    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
636
637    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
638
639    LOG.info("-- Verifying edits from secondary");
640    verifyData(secondaryRegion, 0, numRows, cq, families);
641
642    LOG.info("-- Verifying edits from primary.");
643    verifyData(primaryRegion, 0, numRows, cq, families);
644  }
645
646  /**
647   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
648   * larger than the previous flush start marker.
649   */
650  @Test
651  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
652    // load some data to primary and flush. 1 flush and some more unflushed data
653    putDataWithFlushes(primaryRegion, 100, 100, 100);
654    int numRows = 200;
655
656    // now replay the edits and the flush marker
657    reader = createWALReaderForPrimary();
658
659    LOG.info("-- Replaying edits and flush events in secondary");
660    FlushDescriptor startFlushDesc = null;
661    FlushDescriptor commitFlushDesc = null;
662
663    int lastReplayed = 0;
664    while (true) {
665      WAL.Entry entry = reader.next();
666      if (entry == null) {
667        break;
668      }
669      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
670      if (flushDesc != null) {
671        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
672          if (startFlushDesc == null) {
673            LOG.info("-- Replaying flush start in secondary");
674            startFlushDesc = flushDesc;
675            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
676            assertNull(result.result);
677          }
678        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
679          // do not replay any flush commit yet
680          // hold on to the flush commit marker but simulate a larger
681          // flush commit seqId
682          commitFlushDesc = FlushDescriptor.newBuilder(flushDesc)
683            .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build();
684        }
685        // after replay verify that everything is still visible
686        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
687      } else {
688        lastReplayed = replayEdit(secondaryRegion, entry);
689      }
690    }
691
692    // at this point, there should be some data (rows 0-100) in memstore snapshot
693    // and some more data in memstores (rows 100-200)
694    verifyData(secondaryRegion, 0, numRows, cq, families);
695
696    // no store files in the region
697    int expectedStoreFileCount = 0;
698    for (HStore s : secondaryRegion.getStores()) {
699      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
700    }
701    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
702
703    // Test case 1: replay the a flush commit marker larger than what we have prepared
704    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
705      + startFlushDesc);
706    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
707
708    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
709    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
710
711    // assert that the flush files are picked
712    expectedStoreFileCount++;
713    for (HStore s : secondaryRegion.getStores()) {
714      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
715    }
716    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
717    MemStoreSize mss = store.getFlushableSize();
718    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
719
720    // assert that the region memstore is smaller than before, but not empty
721    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
722    assertTrue(newRegionMemstoreSize > 0);
723    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
724
725    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
726
727    LOG.info("-- Verifying edits from secondary");
728    verifyData(secondaryRegion, 0, numRows, cq, families);
729
730    LOG.info("-- Verifying edits from primary.");
731    verifyData(primaryRegion, 0, numRows, cq, families);
732  }
733
734  /**
735   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
736   * memstore edits should be dropped after the flush commit replay since they should be in flushed
737   * files
738   */
739  @Test
740  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
741    throws IOException {
742    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
743  }
744
745  /**
746   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
747   * memstore edits should be not dropped after the flush commit replay since not every edit will be
748   * in flushed files (based on seqId)
749   */
750  @Test
751  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
752    throws IOException {
753    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
754  }
755
756  /**
757   * Tests the case where we receive a flush commit before receiving any flush prepare markers
758   */
759  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
760    throws IOException {
761    // load some data to primary and flush. 1 flushes and some more unflushed data.
762    // write more data after flush depending on whether droppableSnapshot
763    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
764    int numRows = droppableMemstore ? 100 : 200;
765
766    // now replay the edits and the flush marker
767    reader = createWALReaderForPrimary();
768
769    LOG.info("-- Replaying edits and flush events in secondary");
770    FlushDescriptor commitFlushDesc = null;
771
772    int lastReplayed = 0;
773    while (true) {
774      WAL.Entry entry = reader.next();
775      if (entry == null) {
776        break;
777      }
778      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
779      if (flushDesc != null) {
780        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
781          // do not replay flush start marker
782        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
783          commitFlushDesc = flushDesc; // hold on to the flush commit marker
784        }
785        // after replay verify that everything is still visible
786        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
787      } else {
788        lastReplayed = replayEdit(secondaryRegion, entry);
789      }
790    }
791
792    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
793    // and some more data in memstores (rows 100-300)
794    verifyData(secondaryRegion, 0, numRows, cq, families);
795
796    // no store files in the region
797    int expectedStoreFileCount = 0;
798    for (HStore s : secondaryRegion.getStores()) {
799      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
800    }
801    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
802
803    // Test case 1: replay a flush commit marker without start flush marker
804    assertNull(secondaryRegion.getPrepareFlushResult());
805    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
806
807    // ensure all files are visible in secondary
808    for (HStore store : secondaryRegion.getStores()) {
809      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
810    }
811
812    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
813    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
814
815    // assert that the flush files are picked
816    expectedStoreFileCount++;
817    for (HStore s : secondaryRegion.getStores()) {
818      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
819    }
820    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
821    MemStoreSize mss = store.getFlushableSize();
822    if (droppableMemstore) {
823      // assert that the memstore is dropped
824      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
825    } else {
826      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
827    }
828
829    // assert that the region memstore is same as before (we could not drop)
830    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
831    if (droppableMemstore) {
832      assertTrue(0 == newRegionMemstoreSize);
833    } else {
834      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
835    }
836
837    LOG.info("-- Verifying edits from secondary");
838    verifyData(secondaryRegion, 0, numRows, cq, families);
839
840    LOG.info("-- Verifying edits from primary.");
841    verifyData(primaryRegion, 0, numRows, cq, families);
842  }
843
844  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
845    return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build();
846  }
847
848  /**
849   * Tests replaying region open markers from primary region. Checks whether the files are picked up
850   */
851  @Test
852  public void testReplayRegionOpenEvent() throws IOException {
853    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
854    int numRows = 100;
855
856    // close the region and open again.
857    primaryRegion.close();
858    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
859
860    // now replay the edits and the flush marker
861    reader = createWALReaderForPrimary();
862    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
863
864    LOG.info("-- Replaying edits and region events in secondary");
865    while (true) {
866      WAL.Entry entry = reader.next();
867      if (entry == null) {
868        break;
869      }
870      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
871      RegionEventDescriptor regionEventDesc =
872        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
873
874      if (flushDesc != null) {
875        // don't replay flush events
876      } else if (regionEventDesc != null) {
877        regionEvents.add(regionEventDesc);
878      } else {
879        // don't replay edits
880      }
881    }
882
883    // we should have 1 open, 1 close and 1 open event
884    assertEquals(3, regionEvents.size());
885
886    // replay the first region open event.
887    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
888
889    // replay the close event as well
890    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
891
892    // no store files in the region
893    int expectedStoreFileCount = 0;
894    for (HStore s : secondaryRegion.getStores()) {
895      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
896    }
897    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
898    assertTrue(regionMemstoreSize == 0);
899
900    // now replay the region open event that should contain new file locations
901    LOG.info("Testing replaying region open event " + regionEvents.get(2));
902    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
903
904    // assert that the flush files are picked
905    expectedStoreFileCount++;
906    for (HStore s : secondaryRegion.getStores()) {
907      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
908    }
909    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
910    MemStoreSize mss = store.getFlushableSize();
911    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
912
913    // assert that the region memstore is empty
914    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
915    assertTrue(newRegionMemstoreSize == 0);
916
917    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
918                                                         // any
919
920    LOG.info("-- Verifying edits from secondary");
921    verifyData(secondaryRegion, 0, numRows, cq, families);
922
923    LOG.info("-- Verifying edits from primary.");
924    verifyData(primaryRegion, 0, numRows, cq, families);
925  }
926
927  /**
928   * Tests the case where we replay a region open event after a flush start but before receiving
929   * flush commit
930   */
931  @Test
932  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
933    putDataWithFlushes(primaryRegion, 100, 100, 100);
934    int numRows = 200;
935
936    // close the region and open again.
937    primaryRegion.close();
938    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
939
940    // now replay the edits and the flush marker
941    reader = createWALReaderForPrimary();
942    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
943
944    LOG.info("-- Replaying edits and region events in secondary");
945    while (true) {
946      WAL.Entry entry = reader.next();
947      if (entry == null) {
948        break;
949      }
950      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
951      RegionEventDescriptor regionEventDesc =
952        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
953
954      if (flushDesc != null) {
955        // only replay flush start
956        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
957          secondaryRegion.replayWALFlushStartMarker(flushDesc);
958        }
959      } else if (regionEventDesc != null) {
960        regionEvents.add(regionEventDesc);
961      } else {
962        replayEdit(secondaryRegion, entry);
963      }
964    }
965
966    // at this point, there should be some data (rows 0-100) in the memstore snapshot
967    // and some more data in memstores (rows 100-200)
968    verifyData(secondaryRegion, 0, numRows, cq, families);
969
970    // we should have 1 open, 1 close and 1 open event
971    assertEquals(3, regionEvents.size());
972
973    // no store files in the region
974    int expectedStoreFileCount = 0;
975    for (HStore s : secondaryRegion.getStores()) {
976      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
977    }
978
979    // now replay the region open event that should contain new file locations
980    LOG.info("Testing replaying region open event " + regionEvents.get(2));
981    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
982
983    // assert that the flush files are picked
984    expectedStoreFileCount = 2; // two flushes happened
985    for (HStore s : secondaryRegion.getStores()) {
986      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
987    }
988    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
989    MemStoreSize newSnapshotSize = store.getSnapshotSize();
990    assertTrue(newSnapshotSize.getDataSize() == 0);
991
992    // assert that the region memstore is empty
993    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
994    assertTrue(newRegionMemstoreSize == 0);
995
996    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
997                                                         // any
998
999    LOG.info("-- Verifying edits from secondary");
1000    verifyData(secondaryRegion, 0, numRows, cq, families);
1001
1002    LOG.info("-- Verifying edits from primary.");
1003    verifyData(primaryRegion, 0, numRows, cq, families);
1004  }
1005
1006  /**
1007   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1008   * of the last replayed region open event.
1009   */
1010  @Test
1011  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1012    putDataWithFlushes(primaryRegion, 100, 100, 0);
1013    int numRows = 100;
1014
1015    // close the region and open again.
1016    primaryRegion.close();
1017    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1018
1019    // now replay the edits and the flush marker
1020    reader = createWALReaderForPrimary();
1021    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1022    List<WAL.Entry> edits = Lists.newArrayList();
1023
1024    LOG.info("-- Replaying edits and region events in secondary");
1025    while (true) {
1026      WAL.Entry entry = reader.next();
1027      if (entry == null) {
1028        break;
1029      }
1030      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1031      RegionEventDescriptor regionEventDesc =
1032        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1033
1034      if (flushDesc != null) {
1035        // don't replay flushes
1036      } else if (regionEventDesc != null) {
1037        regionEvents.add(regionEventDesc);
1038      } else {
1039        edits.add(entry);
1040      }
1041    }
1042
1043    // replay the region open of first open, but with the seqid of the second open
1044    // this way non of the flush files will be picked up.
1045    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0))
1046      .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build());
1047
1048    // replay edits from the before region close. If replay does not
1049    // skip these the following verification will NOT fail.
1050    for (WAL.Entry entry : edits) {
1051      replayEdit(secondaryRegion, entry);
1052    }
1053
1054    boolean expectedFail = false;
1055    try {
1056      verifyData(secondaryRegion, 0, numRows, cq, families);
1057    } catch (AssertionError e) {
1058      expectedFail = true; // expected
1059    }
1060    if (!expectedFail) {
1061      fail("Should have failed this verification");
1062    }
1063  }
1064
1065  @Test
1066  public void testReplayFlushSeqIds() throws IOException {
1067    // load some data to primary and flush
1068    int start = 0;
1069    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
1070    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1071    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1072    primaryRegion.flush(true);
1073
1074    // now replay the flush marker
1075    reader = createWALReaderForPrimary();
1076
1077    long flushSeqId = -1;
1078    LOG.info("-- Replaying flush events in secondary");
1079    while (true) {
1080      WAL.Entry entry = reader.next();
1081      if (entry == null) {
1082        break;
1083      }
1084      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1085      if (flushDesc != null) {
1086        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1087          LOG.info("-- Replaying flush start in secondary");
1088          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1089          flushSeqId = flushDesc.getFlushSequenceNumber();
1090        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1091          LOG.info("-- Replaying flush commit in secondary");
1092          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1093          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1094        }
1095      }
1096      // else do not replay
1097    }
1098
1099    // TODO: what to do with this?
1100    // assert that the newly picked up flush file is visible
1101    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1102    assertEquals(flushSeqId, readPoint);
1103
1104    // after replay verify that everything is still visible
1105    verifyData(secondaryRegion, 0, 100, cq, families);
1106  }
1107
1108  @Test
1109  public void testSeqIdsFromReplay() throws IOException {
1110    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1111    // original seqIds and they are made visible through mvcc read point upon replay
1112    String method = name;
1113    byte[] tableName = Bytes.toBytes(method);
1114    byte[] family = Bytes.toBytes("family");
1115
1116    HRegion region = initHRegion(tableName, family);
1117    try {
1118      // replay an entry that is bigger than current read point
1119      long readPoint = region.getMVCC().getReadPoint();
1120      long origSeqId = readPoint + 100;
1121
1122      Put put = new Put(row).addColumn(family, row, row);
1123      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1124      replay(region, put, origSeqId);
1125
1126      // read point should have advanced to this seqId
1127      assertGet(region, family, row);
1128
1129      // region seqId should have advanced at least to this seqId
1130      assertEquals(origSeqId, region.getReadPoint(null));
1131
1132      // replay an entry that is smaller than current read point
1133      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1134      // replay does not allow reads while replay is going on.
1135      put = new Put(row2).addColumn(family, row2, row2);
1136      put.setDurability(Durability.SKIP_WAL);
1137      replay(region, put, origSeqId - 50);
1138
1139      assertGet(region, family, row2);
1140    } finally {
1141      region.close();
1142    }
1143  }
1144
1145  /**
1146   * Tests that a region opened in secondary mode would not write region open / close events to its
1147   * WAL.
1148   */
1149  @Test
1150  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1151    secondaryRegion.close();
1152    walSecondary = spy(walSecondary);
1153
1154    // test for region open and close
1155    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1156    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1157      any(WALEdit.class));
1158
1159    // test for replay prepare flush
1160    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1161    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder()
1162      .setFlushSequenceNumber(10)
1163      .setTableName(UnsafeByteOperations
1164        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1165      .setAction(FlushAction.START_FLUSH)
1166      .setEncodedRegionName(
1167        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1168      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1169      .build());
1170
1171    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1172      any(WALEdit.class));
1173
1174    secondaryRegion.close();
1175    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1176      any(WALEdit.class));
1177  }
1178
1179  /**
1180   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1181   */
1182  @Test
1183  public void testRegionReadsEnabledFlag() throws IOException {
1184
1185    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1186
1187    verifyData(secondaryRegion, 0, 100, cq, families);
1188
1189    // now disable reads
1190    secondaryRegion.setReadsEnabled(false);
1191    try {
1192      verifyData(secondaryRegion, 0, 100, cq, families);
1193      fail("Should have failed with IOException");
1194    } catch (IOException ex) {
1195      // expected
1196    }
1197
1198    // verify that we can still replay data
1199    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1200
1201    // now enable reads again
1202    secondaryRegion.setReadsEnabled(true);
1203    verifyData(secondaryRegion, 0, 200, cq, families);
1204  }
1205
1206  /**
1207   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1208   * It should write the flush request marker instead.
1209   */
1210  @Test
1211  public void testWriteFlushRequestMarker() throws IOException {
1212    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1213    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1214    assertNotNull(result);
1215    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1216    assertFalse(result.wroteFlushWalMarker);
1217
1218    // request flush again, but this time with writeFlushRequestWalMarker = true
1219    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1220    assertNotNull(result);
1221    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1222    assertTrue(result.wroteFlushWalMarker);
1223
1224    List<FlushDescriptor> flushes = Lists.newArrayList();
1225    reader = createWALReaderForPrimary();
1226    while (true) {
1227      WAL.Entry entry = reader.next();
1228      if (entry == null) {
1229        break;
1230      }
1231      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1232      if (flush != null) {
1233        flushes.add(flush);
1234      }
1235    }
1236
1237    assertEquals(1, flushes.size());
1238    assertNotNull(flushes.get(0));
1239    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1240  }
1241
1242  /**
1243   * Test the case where the secondary region replica is not in reads enabled state because it is
1244   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush
1245   * marker entry should restore the reads enabled status in the region and allow the reads to
1246   * continue.
1247   */
1248  @Test
1249  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1250    disableReads(secondaryRegion);
1251
1252    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1253    // triggered flush restores readsEnabled
1254    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1255    reader = createWALReaderForPrimary();
1256    while (true) {
1257      WAL.Entry entry = reader.next();
1258      if (entry == null) {
1259        break;
1260      }
1261      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1262      if (flush != null) {
1263        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1264      }
1265    }
1266
1267    // now reads should be enabled
1268    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1269  }
1270
1271  /**
1272   * Test the case where the secondary region replica is not in reads enabled state because it is
1273   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1274   * entries should restore the reads enabled status in the region and allow the reads to continue.
1275   */
1276  @Test
1277  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1278    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1279    // from triggered flush restores readsEnabled
1280    disableReads(secondaryRegion);
1281
1282    // put some data in primary
1283    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1284    primaryRegion.flush(true);
1285    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1286    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1287    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1288    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1289    // but can't figure it... and this is only test that seems to suffer this flush issue.
1290    // St.Ack 20160201
1291    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1292
1293    reader = createWALReaderForPrimary();
1294    while (true) {
1295      WAL.Entry entry = reader.next();
1296      LOG.info(Objects.toString(entry));
1297      if (entry == null) {
1298        break;
1299      }
1300      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1301      if (flush != null) {
1302        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1303      } else {
1304        replayEdit(secondaryRegion, entry);
1305      }
1306    }
1307
1308    // now reads should be enabled
1309    verifyData(secondaryRegion, 0, 100, cq, families);
1310  }
1311
1312  /**
1313   * Test the case where the secondary region replica is not in reads enabled state because it is
1314   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1315   * entries should restore the reads enabled status in the region and allow the reads to continue.
1316   */
1317  @Test
1318  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1319    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1320    // from triggered flush restores readsEnabled
1321    disableReads(secondaryRegion);
1322
1323    // put some data in primary
1324    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1325    primaryRegion.flush(true);
1326
1327    reader = createWALReaderForPrimary();
1328    while (true) {
1329      WAL.Entry entry = reader.next();
1330      if (entry == null) {
1331        break;
1332      }
1333      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1334      if (flush != null) {
1335        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1336      }
1337    }
1338
1339    // now reads should be enabled
1340    verifyData(secondaryRegion, 0, 100, cq, families);
1341  }
1342
1343  /**
1344   * Test the case where the secondary region replica is not in reads enabled state because it is
1345   * waiting for a flush or region open marker from primary region. Replaying region open event
1346   * entry from primary should restore the reads enabled status in the region and allow the reads to
1347   * continue.
1348   */
1349  @Test
1350  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1351    // Test case 3: Test that replaying region open event markers restores readsEnabled
1352    disableReads(secondaryRegion);
1353
1354    primaryRegion.close();
1355    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1356
1357    reader = createWALReaderForPrimary();
1358    while (true) {
1359      WAL.Entry entry = reader.next();
1360      if (entry == null) {
1361        break;
1362      }
1363
1364      RegionEventDescriptor regionEventDesc =
1365        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1366
1367      if (regionEventDesc != null) {
1368        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1369      }
1370    }
1371
1372    // now reads should be enabled
1373    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1374  }
1375
1376  @Test
1377  public void testRefresStoreFiles() throws IOException {
1378    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1379    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1380
1381    // Test case 1: refresh with an empty region
1382    secondaryRegion.refreshStoreFiles();
1383    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1384
1385    // do one flush
1386    putDataWithFlushes(primaryRegion, 100, 100, 0);
1387    int numRows = 100;
1388
1389    // refresh the store file list, and ensure that the files are picked up.
1390    secondaryRegion.refreshStoreFiles();
1391    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1392      secondaryRegion.getStoreFileList(families));
1393    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1394
1395    LOG.info("-- Verifying edits from secondary");
1396    verifyData(secondaryRegion, 0, numRows, cq, families);
1397
1398    // Test case 2: 3 some more flushes
1399    putDataWithFlushes(primaryRegion, 100, 300, 0);
1400    numRows = 300;
1401
1402    // refresh the store file list, and ensure that the files are picked up.
1403    secondaryRegion.refreshStoreFiles();
1404    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1405      secondaryRegion.getStoreFileList(families));
1406    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1407
1408    LOG.info("-- Verifying edits from secondary");
1409    verifyData(secondaryRegion, 0, numRows, cq, families);
1410
1411    if (FSUtils.WINDOWS) {
1412      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1413      return;
1414    }
1415
1416    // Test case 3: compact primary files
1417    primaryRegion.compactStores();
1418    List<HRegion> regions = new ArrayList<>();
1419    regions.add(primaryRegion);
1420    Mockito.doReturn(regions).when(rss).getRegions();
1421    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1422    cleaner.chore();
1423    secondaryRegion.refreshStoreFiles();
1424    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1425      secondaryRegion.getStoreFileList(families));
1426    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1427
1428    LOG.info("-- Verifying edits from secondary");
1429    verifyData(secondaryRegion, 0, numRows, cq, families);
1430
1431    LOG.info("-- Replaying edits in secondary");
1432
1433    // Test case 4: replay some edits, ensure that memstore is dropped.
1434    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1435    putDataWithFlushes(primaryRegion, 400, 400, 0);
1436    numRows = 400;
1437
1438    reader = createWALReaderForPrimary();
1439    while (true) {
1440      WAL.Entry entry = reader.next();
1441      if (entry == null) {
1442        break;
1443      }
1444      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1445      if (flush != null) {
1446        // do not replay flush
1447      } else {
1448        replayEdit(secondaryRegion, entry);
1449      }
1450    }
1451
1452    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1453
1454    secondaryRegion.refreshStoreFiles();
1455
1456    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1457
1458    LOG.info("-- Verifying edits from primary");
1459    verifyData(primaryRegion, 0, numRows, cq, families);
1460    LOG.info("-- Verifying edits from secondary");
1461    verifyData(secondaryRegion, 0, numRows, cq, families);
1462  }
1463
1464  /**
1465   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1466   */
1467  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1468    List<Path> l1 = new ArrayList<>(list1.size());
1469    for (String path : list1) {
1470      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1471    }
1472    List<Path> l2 = new ArrayList<>(list2.size());
1473    for (String path : list2) {
1474      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1475    }
1476    assertEquals(l1, l2);
1477  }
1478
1479  private void disableReads(HRegion region) {
1480    region.setReadsEnabled(false);
1481    try {
1482      verifyData(region, 0, 1, cq, families);
1483      fail("Should have failed with IOException");
1484    } catch (IOException ex) {
1485      // expected
1486    }
1487  }
1488
1489  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1490    put.setDurability(Durability.SKIP_WAL);
1491    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1492    region.batchReplay(new MutationReplay[] { mutation }, replaySeqId);
1493  }
1494
1495  /**
1496   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1497   */
1498  @Test
1499  public void testReplayBulkLoadEvent() throws IOException {
1500    LOG.info("testReplayBulkLoadEvent starts");
1501    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1502
1503    // close the region and open again.
1504    primaryRegion.close();
1505    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1506
1507    // bulk load a file into primary region
1508    byte[] randomValues = new byte[20];
1509    Bytes.random(randomValues);
1510    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1511
1512    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1513    int expectedLoadFileCount = 0;
1514    for (byte[] family : families) {
1515      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1516      expectedLoadFileCount++;
1517    }
1518    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1519
1520    // now replay the edits and the bulk load marker
1521    reader = createWALReaderForPrimary();
1522
1523    LOG.info("-- Replaying edits and region events in secondary");
1524    BulkLoadDescriptor bulkloadEvent = null;
1525    while (true) {
1526      WAL.Entry entry = reader.next();
1527      if (entry == null) {
1528        break;
1529      }
1530      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1531      if (bulkloadEvent != null) {
1532        break;
1533      }
1534    }
1535
1536    // we should have 1 bulk load event
1537    assertTrue(bulkloadEvent != null);
1538    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1539
1540    // replay the bulk load event
1541    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1542
1543    List<String> storeFileNames = new ArrayList<>();
1544    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1545      storeFileNames.addAll(storeDesc.getStoreFileList());
1546    }
1547    // assert that the bulk loaded files are picked
1548    for (HStore s : secondaryRegion.getStores()) {
1549      for (HStoreFile sf : s.getStorefiles()) {
1550        storeFileNames.remove(sf.getPath().getName());
1551      }
1552    }
1553    assertTrue(storeFileNames.isEmpty(), "Found some store file isn't loaded:" + storeFileNames);
1554
1555    LOG.info("-- Verifying edits from secondary");
1556    for (byte[] family : families) {
1557      assertGet(secondaryRegion, family, randomValues);
1558    }
1559  }
1560
1561  @Test
1562  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1563    // tests replaying flush commit marker, but the flush file has already been compacted
1564    // from primary and also deleted from the archive directory
1565    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder()
1566      .setFlushSequenceNumber(Long.MAX_VALUE)
1567      .setTableName(UnsafeByteOperations
1568        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1569      .setAction(FlushAction.COMMIT_FLUSH)
1570      .setEncodedRegionName(
1571        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1572      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1573      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1574        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1575        .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build())
1576      .build());
1577  }
1578
1579  @Test
1580  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1581    // tests replaying compaction marker, but the compaction output file has already been compacted
1582    // from primary and also deleted from the archive directory
1583    secondaryRegion
1584      .replayWALCompactionMarker(
1585        CompactionDescriptor.newBuilder()
1586          .setTableName(UnsafeByteOperations
1587            .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1588          .setEncodedRegionName(
1589            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1590          .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123")
1591          .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir")
1592          .setRegionName(
1593            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1594          .build(),
1595        true, true, Long.MAX_VALUE);
1596  }
1597
1598  @Test
1599  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1600    // tests replaying region open event marker, but the region files have already been compacted
1601    // from primary and also deleted from the archive directory
1602    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1603      .setTableName(UnsafeByteOperations
1604        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1605      .setEncodedRegionName(
1606        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1607      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1608      .setEventType(EventType.REGION_OPEN)
1609      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1610      .setLogSequenceNumber(Long.MAX_VALUE)
1611      .addStores(
1612        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1613          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1614      .build());
1615  }
1616
1617  @Test
1618  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1619    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1620    // from primary and also deleted from the archive directory
1621    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1622      .setTableName(
1623        ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1624      .setEncodedRegionName(
1625        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1626      .setBulkloadSeqNum(Long.MAX_VALUE)
1627      .addStores(
1628        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1629          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1630      .build());
1631  }
1632
1633  private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes)
1634    throws IOException {
1635    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1636    // TODO We need a way to do this without creating files
1637    Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1638    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1639    try {
1640      hFileFactory.withOutputStream(out);
1641      hFileFactory.withFileContext(new HFileContextBuilder().build());
1642      HFile.Writer writer = hFileFactory.create();
1643      try {
1644        writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
1645          .setRow(valueBytes).setFamily(family).setQualifier(valueBytes).setTimestamp(0L)
1646          .setType(KeyValue.Type.Put.getCode()).setValue(valueBytes).build()));
1647      } finally {
1648        writer.close();
1649      }
1650    } finally {
1651      out.close();
1652    }
1653    return testFile.toString();
1654  }
1655
1656  /**
1657   * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush
1658   * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but
1659   * does not execute flush after
1660   */
1661  private void putDataWithFlushes(HRegion region, int flushInterval, int numRows,
1662    int numRowsAfterFlush) throws IOException {
1663    int start = 0;
1664    for (; start < numRows; start += flushInterval) {
1665      LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval));
1666      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1667      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1668      region.flush(true);
1669    }
1670    LOG.info("-- Writing some more data to primary, not flushing");
1671    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1672  }
1673
1674  private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf,
1675    byte[]... families) throws IOException {
1676    for (int i = startRow; i < startRow + numRows; i++) {
1677      Put put = new Put(Bytes.toBytes("" + i));
1678      put.setDurability(Durability.SKIP_WAL);
1679      for (byte[] family : families) {
1680        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1681      }
1682      replay(region, put, i + 1);
1683    }
1684  }
1685
1686  private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException {
1687    return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW,
1688      HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families);
1689  }
1690}