001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.spy;
032import static org.mockito.Mockito.times;
033import static org.mockito.Mockito.verify;
034import static org.mockito.Mockito.when;
035
036import java.io.FileNotFoundException;
037import java.io.IOException;
038import java.util.ArrayList;
039import java.util.List;
040import java.util.Map;
041import java.util.Objects;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.CellBuilderType;
047import org.apache.hadoop.hbase.CellUtil;
048import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseTestingUtil;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.ServerName;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Durability;
057import org.apache.hadoop.hbase.client.Get;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.RegionInfo;
060import org.apache.hadoop.hbase.client.RegionInfoBuilder;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hadoop.hbase.executor.ExecutorService;
064import org.apache.hadoop.hbase.executor.ExecutorType;
065import org.apache.hadoop.hbase.io.hfile.HFile;
066import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
067import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
068import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
069import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
070import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
071import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
072import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
073import org.apache.hadoop.hbase.testclassification.LargeTests;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
077import org.apache.hadoop.hbase.util.FSUtils;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.util.Strings;
080import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
081import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
082import org.apache.hadoop.hbase.wal.WAL;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALFactory;
085import org.apache.hadoop.hbase.wal.WALKeyImpl;
086import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
087import org.apache.hadoop.hbase.wal.WALStreamReader;
088import org.junit.After;
089import org.junit.AfterClass;
090import org.junit.Before;
091import org.junit.BeforeClass;
092import org.junit.ClassRule;
093import org.junit.Rule;
094import org.junit.Test;
095import org.junit.experimental.categories.Category;
096import org.junit.rules.TestName;
097import org.mockito.Mockito;
098import org.slf4j.Logger;
099import org.slf4j.LoggerFactory;
100
101import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
102import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
103
104import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
114
115/**
116 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
117 * region replicas
118 */
119@SuppressWarnings("deprecation")
120@Category(LargeTests.class)
121public class TestHRegionReplayEvents {
122
123  @ClassRule
124  public static final HBaseClassTestRule CLASS_RULE =
125    HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
126
127  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
128  @Rule
129  public TestName name = new TestName();
130
131  private static HBaseTestingUtil TEST_UTIL;
132
133  public static Configuration CONF;
134  private String dir;
135
136  private byte[][] families =
137    new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") };
138
139  // Test names
140  protected byte[] tableName;
141  protected String method;
142  protected final byte[] row = Bytes.toBytes("rowA");
143  protected final byte[] row2 = Bytes.toBytes("rowB");
144  protected byte[] cq = Bytes.toBytes("cq");
145
146  // per test fields
147  private Path rootDir;
148  private TableDescriptor htd;
149  private RegionServerServices rss;
150  private RegionInfo primaryHri, secondaryHri;
151  private HRegion primaryRegion, secondaryRegion;
152  private WAL walPrimary, walSecondary;
153  private WALStreamReader reader;
154
155  @BeforeClass
156  public static void setUpBeforeClass() throws Exception {
157    TEST_UTIL = new HBaseTestingUtil();
158    TEST_UTIL.startMiniDFSCluster(1);
159  }
160
161  @AfterClass
162  public static void tearDownAfterClass() throws Exception {
163    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
164    TEST_UTIL.cleanupTestDir();
165    TEST_UTIL.shutdownMiniDFSCluster();
166  }
167
168  @Before
169  public void setUp() throws Exception {
170    CONF = TEST_UTIL.getConfiguration();
171    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
172    method = name.getMethodName();
173    tableName = Bytes.toBytes(name.getMethodName());
174    rootDir = new Path(dir + method);
175    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
176    method = name.getMethodName();
177    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
178    for (byte[] family : families) {
179      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
180    }
181    htd = builder.build();
182
183    long time = EnvironmentEdgeManager.currentTime();
184    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
185      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
186    primaryHri =
187      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
188    secondaryHri =
189      RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
190
191    WALFactory wals = TestHRegion.createWALFactory(CONF, rootDir);
192    walPrimary = wals.getWAL(primaryHri);
193    walSecondary = wals.getWAL(secondaryHri);
194
195    rss = mock(RegionServerServices.class);
196    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
197    when(rss.getConfiguration()).thenReturn(CONF);
198    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
199    when(rss.getRegionServerSpaceQuotaManager()).thenReturn(null); // or mock it properly
200    when(rss.getFlushRequester()).thenReturn(mock(FlushRequester.class));
201    when(rss.getCompactionRequestor()).thenReturn(mock(CompactionRequester.class));
202    when(rss.getMetrics()).thenReturn(mock(MetricsRegionServer.class));
203    String string =
204      org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString();
205    ExecutorService es = new ExecutorService(string);
206    es.startExecutorService(es.new ExecutorConfig().setCorePoolSize(1)
207      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
208    when(rss.getExecutorService()).thenReturn(es);
209    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
210    primaryRegion.close();
211    List<HRegion> regions = new ArrayList<>();
212    regions.add(primaryRegion);
213    Mockito.doReturn(regions).when(rss).getRegions();
214
215    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
216    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
217
218    reader = null;
219  }
220
221  @After
222  public void tearDown() throws Exception {
223    if (reader != null) {
224      reader.close();
225    }
226
227    if (primaryRegion != null) {
228      HBaseTestingUtil.closeRegionAndWAL(primaryRegion);
229    }
230    if (secondaryRegion != null) {
231      HBaseTestingUtil.closeRegionAndWAL(secondaryRegion);
232    }
233
234    EnvironmentEdgeManagerTestHelper.reset();
235  }
236
237  String getName() {
238    return name.getMethodName();
239  }
240
241  // Some of the test cases are as follows:
242  // 1. replay flush start marker again
243  // 2. replay flush with smaller seqId than what is there in memstore snapshot
244  // 3. replay flush with larger seqId than what is there in memstore snapshot
245  // 4. replay flush commit without flush prepare. non droppable memstore
246  // 5. replay flush commit without flush prepare. droppable memstore
247  // 6. replay open region event
248  // 7. replay open region event after flush start
249  // 8. replay flush form an earlier seqId (test ignoring seqIds)
250  // 9. start flush does not prevent region from closing.
251
252  @Test
253  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
254    // load some data and flush ensure that the secondary replica will not execute the flush
255
256    // load some data to secondary by replaying
257    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
258
259    verifyData(secondaryRegion, 0, 1000, cq, families);
260
261    // flush region
262    FlushResultImpl flush = (FlushResultImpl) secondaryRegion.flush(true);
263    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
264
265    verifyData(secondaryRegion, 0, 1000, cq, families);
266
267    // close the region, and inspect that it has not flushed
268    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
269    // assert that there are no files (due to flush)
270    for (List<HStoreFile> f : files.values()) {
271      assertTrue(f.isEmpty());
272    }
273  }
274
275  /**
276   * Tests a case where we replay only a flush start marker, then the region is closed. This region
277   * should not block indefinitely
278   */
279  @Test
280  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
281    // load some data to primary and flush
282    int start = 0;
283    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
284    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
285    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
286    primaryRegion.flush(true);
287
288    // now replay the edits and the flush marker
289    reader = createWALReaderForPrimary();
290
291    LOG.info("-- Replaying edits and flush events in secondary");
292    while (true) {
293      WAL.Entry entry = reader.next();
294      if (entry == null) {
295        break;
296      }
297      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
298      if (flushDesc != null) {
299        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
300          LOG.info("-- Replaying flush start in secondary");
301          secondaryRegion.replayWALFlushStartMarker(flushDesc);
302        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
303          LOG.info("-- NOT Replaying flush commit in secondary");
304        }
305      } else {
306        replayEdit(secondaryRegion, entry);
307      }
308    }
309
310    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
311    // now close the region which should not cause hold because of un-committed flush
312    secondaryRegion.close();
313
314    // verify that the memstore size is back to what it was
315    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
316  }
317
318  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
319    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
320      return 0; // handled elsewhere
321    }
322    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
323    for (Cell cell : entry.getEdit().getCells())
324      put.add(cell);
325    put.setDurability(Durability.SKIP_WAL);
326    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
327    region.batchReplay(new MutationReplay[] { mutation }, entry.getKey().getSequenceId());
328    return Integer.parseInt(Bytes.toString(put.getRow()));
329  }
330
331  private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException {
332    return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(),
333      AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration());
334  }
335
336  @Test
337  public void testBatchReplayWithMultipleNonces() throws IOException {
338    try {
339      MutationReplay[] mutations = new MutationReplay[100];
340      for (int i = 0; i < 100; i++) {
341        Put put = new Put(Bytes.toBytes(i));
342        put.setDurability(Durability.SYNC_WAL);
343        for (byte[] familly : this.families) {
344          put.addColumn(familly, this.cq, null);
345          long nonceNum = i / 10;
346          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
347        }
348      }
349      primaryRegion.batchReplay(mutations, 20);
350    } catch (Exception e) {
351      String msg = "Error while replay of batch with multiple nonces. ";
352      LOG.error(msg, e);
353      fail(msg + e.getMessage());
354    }
355  }
356
357  @Test
358  public void testReplayFlushesAndCompactions() throws IOException {
359    // initiate a secondary region with some data.
360
361    // load some data to primary and flush. 3 flushes and some more unflushed data
362    putDataWithFlushes(primaryRegion, 100, 300, 100);
363
364    // compaction from primary
365    LOG.info("-- Compacting primary, only 1 store");
366    primaryRegion.compactStore(Bytes.toBytes("cf1"), NoLimitThroughputController.INSTANCE);
367
368    // now replay the edits and the flush marker
369    reader = createWALReaderForPrimary();
370
371    LOG.info("-- Replaying edits and flush events in secondary");
372    int lastReplayed = 0;
373    int expectedStoreFileCount = 0;
374    while (true) {
375      WAL.Entry entry = reader.next();
376      if (entry == null) {
377        break;
378      }
379      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
380      CompactionDescriptor compactionDesc =
381        WALEdit.getCompaction(entry.getEdit().getCells().get(0));
382      if (flushDesc != null) {
383        // first verify that everything is replayed and visible before flush event replay
384        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
385        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
386        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
387        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
388        MemStoreSize mss = store.getFlushableSize();
389        long storeSize = store.getSize();
390        long storeSizeUncompressed = store.getStoreSizeUncompressed();
391        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
392          LOG.info("-- Replaying flush start in secondary");
393          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
394          assertNull(result.result);
395          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
396
397          // assert that the store memstore is smaller now
398          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
399          LOG.info("Memstore size reduced by:"
400            + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
401          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
402
403        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
404          LOG.info("-- Replaying flush commit in secondary");
405          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
406
407          // assert that the flush files are picked
408          expectedStoreFileCount++;
409          for (HStore s : secondaryRegion.getStores()) {
410            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
411          }
412          MemStoreSize newMss = store.getFlushableSize();
413          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
414
415          // assert that the region memstore is smaller now
416          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
417          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
418
419          // assert that the store sizes are bigger
420          assertTrue(store.getSize() > storeSize);
421          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
422          assertEquals(store.getSize(), store.getStorefilesSize());
423        }
424        // after replay verify that everything is still visible
425        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
426      } else if (compactionDesc != null) {
427        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
428
429        // assert that the compaction is applied
430        for (HStore store : secondaryRegion.getStores()) {
431          StoreFileTracker sft =
432            StoreFileTrackerFactory.create(CONF, false, store.getStoreContext());
433          if (store.getColumnFamilyName().equals("cf1")) {
434            assertEquals(1, store.getStorefilesCount());
435          } else {
436            assertEquals(expectedStoreFileCount, sft.load().size());
437          }
438        }
439      } else {
440        lastReplayed = replayEdit(secondaryRegion, entry);
441      }
442    }
443
444    assertEquals(400 - 1, lastReplayed);
445    LOG.info("-- Verifying edits from secondary");
446    verifyData(secondaryRegion, 0, 400, cq, families);
447
448    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
449    verifyData(primaryRegion, 0, lastReplayed, cq, families);
450    for (HStore store : primaryRegion.getStores()) {
451      if (store.getColumnFamilyName().equals("cf1")) {
452        assertEquals(1, store.getStorefilesCount());
453      } else {
454        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
455      }
456    }
457  }
458
459  /**
460   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
461   * equal to, greater or less than the previous flush start marker.
462   */
463  @Test
464  public void testReplayFlushStartMarkers() throws IOException {
465    // load some data to primary and flush. 1 flush and some more unflushed data
466    putDataWithFlushes(primaryRegion, 100, 100, 100);
467    int numRows = 200;
468
469    // now replay the edits and the flush marker
470    reader = createWALReaderForPrimary();
471
472    LOG.info("-- Replaying edits and flush events in secondary");
473
474    FlushDescriptor startFlushDesc = null;
475
476    int lastReplayed = 0;
477    while (true) {
478      WAL.Entry entry = reader.next();
479      if (entry == null) {
480        break;
481      }
482      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
483      if (flushDesc != null) {
484        // first verify that everything is replayed and visible before flush event replay
485        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
486        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
487        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
488        MemStoreSize mss = store.getFlushableSize();
489
490        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
491          startFlushDesc = flushDesc;
492          LOG.info("-- Replaying flush start in secondary");
493          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
494          assertNull(result.result);
495          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
496          assertTrue(regionMemstoreSize > 0);
497          assertTrue(mss.getHeapSize() > 0);
498
499          // assert that the store memstore is smaller now
500          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
501          LOG.info("Memstore size reduced by:"
502            + Strings.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
503          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
504          verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
505
506        }
507        // after replay verify that everything is still visible
508        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
509      } else {
510        lastReplayed = replayEdit(secondaryRegion, entry);
511      }
512    }
513
514    // at this point, there should be some data (rows 0-100) in memstore snapshot
515    // and some more data in memstores (rows 100-200)
516
517    verifyData(secondaryRegion, 0, numRows, cq, families);
518
519    // Test case 1: replay the same flush start marker again
520    LOG.info("-- Replaying same flush start in secondary again");
521    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
522    assertNull(result); // this should return null. Ignoring the flush start marker
523    // assert that we still have prepared flush with the previous setup.
524    assertNotNull(secondaryRegion.getPrepareFlushResult());
525    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
526      startFlushDesc.getFlushSequenceNumber());
527    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
528    verifyData(secondaryRegion, 0, numRows, cq, families);
529
530    // Test case 2: replay a flush start marker with a smaller seqId
531    FlushDescriptor startFlushDescSmallerSeqId =
532      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
533    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
534    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
535    assertNull(result); // this should return null. Ignoring the flush start marker
536    // assert that we still have prepared flush with the previous setup.
537    assertNotNull(secondaryRegion.getPrepareFlushResult());
538    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
539      startFlushDesc.getFlushSequenceNumber());
540    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
541    verifyData(secondaryRegion, 0, numRows, cq, families);
542
543    // Test case 3: replay a flush start marker with a larger seqId
544    FlushDescriptor startFlushDescLargerSeqId =
545      clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
546    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
547    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
548    assertNull(result); // this should return null. Ignoring the flush start marker
549    // assert that we still have prepared flush with the previous setup.
550    assertNotNull(secondaryRegion.getPrepareFlushResult());
551    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
552      startFlushDesc.getFlushSequenceNumber());
553    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
554    verifyData(secondaryRegion, 0, numRows, cq, families);
555
556    LOG.info("-- Verifying edits from secondary");
557    verifyData(secondaryRegion, 0, numRows, cq, families);
558
559    LOG.info("-- Verifying edits from primary.");
560    verifyData(primaryRegion, 0, numRows, cq, families);
561  }
562
563  /**
564   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
565   * less than the previous flush start marker.
566   */
567  @Test
568  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
569    // load some data to primary and flush. 2 flushes and some more unflushed data
570    putDataWithFlushes(primaryRegion, 100, 200, 100);
571    int numRows = 300;
572
573    // now replay the edits and the flush marker
574    reader = createWALReaderForPrimary();
575
576    LOG.info("-- Replaying edits and flush events in secondary");
577    FlushDescriptor startFlushDesc = null;
578    FlushDescriptor commitFlushDesc = null;
579
580    int lastReplayed = 0;
581    while (true) {
582      System.out.println(lastReplayed);
583      WAL.Entry entry = reader.next();
584      if (entry == null) {
585        break;
586      }
587      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
588      if (flushDesc != null) {
589        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
590          // don't replay the first flush start marker, hold on to it, replay the second one
591          if (startFlushDesc == null) {
592            startFlushDesc = flushDesc;
593          } else {
594            LOG.info("-- Replaying flush start in secondary");
595            startFlushDesc = flushDesc;
596            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
597            assertNull(result.result);
598          }
599        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
600          // do not replay any flush commit yet
601          if (commitFlushDesc == null) {
602            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
603          }
604        }
605        // after replay verify that everything is still visible
606        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
607      } else {
608        lastReplayed = replayEdit(secondaryRegion, entry);
609      }
610    }
611
612    // at this point, there should be some data (rows 0-200) in memstore snapshot
613    // and some more data in memstores (rows 200-300)
614    verifyData(secondaryRegion, 0, numRows, cq, families);
615
616    // no store files in the region
617    int expectedStoreFileCount = 0;
618    for (HStore s : secondaryRegion.getStores()) {
619      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
620    }
621    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
622
623    // Test case 1: replay the a flush commit marker smaller than what we have prepared
624    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
625      + startFlushDesc);
626    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
627
628    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
629    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
630
631    // assert that the flush files are picked
632    expectedStoreFileCount++;
633    for (HStore s : secondaryRegion.getStores()) {
634      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
635    }
636    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
637    MemStoreSize mss = store.getFlushableSize();
638    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
639
640    // assert that the region memstore is same as before
641    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
642    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
643
644    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
645
646    LOG.info("-- Verifying edits from secondary");
647    verifyData(secondaryRegion, 0, numRows, cq, families);
648
649    LOG.info("-- Verifying edits from primary.");
650    verifyData(primaryRegion, 0, numRows, cq, families);
651  }
652
653  /**
654   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
655   * larger than the previous flush start marker.
656   */
657  @Test
658  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
659    // load some data to primary and flush. 1 flush and some more unflushed data
660    putDataWithFlushes(primaryRegion, 100, 100, 100);
661    int numRows = 200;
662
663    // now replay the edits and the flush marker
664    reader = createWALReaderForPrimary();
665
666    LOG.info("-- Replaying edits and flush events in secondary");
667    FlushDescriptor startFlushDesc = null;
668    FlushDescriptor commitFlushDesc = null;
669
670    int lastReplayed = 0;
671    while (true) {
672      WAL.Entry entry = reader.next();
673      if (entry == null) {
674        break;
675      }
676      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
677      if (flushDesc != null) {
678        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
679          if (startFlushDesc == null) {
680            LOG.info("-- Replaying flush start in secondary");
681            startFlushDesc = flushDesc;
682            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
683            assertNull(result.result);
684          }
685        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
686          // do not replay any flush commit yet
687          // hold on to the flush commit marker but simulate a larger
688          // flush commit seqId
689          commitFlushDesc = FlushDescriptor.newBuilder(flushDesc)
690            .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50).build();
691        }
692        // after replay verify that everything is still visible
693        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
694      } else {
695        lastReplayed = replayEdit(secondaryRegion, entry);
696      }
697    }
698
699    // at this point, there should be some data (rows 0-100) in memstore snapshot
700    // and some more data in memstores (rows 100-200)
701    verifyData(secondaryRegion, 0, numRows, cq, families);
702
703    // no store files in the region
704    int expectedStoreFileCount = 0;
705    for (HStore s : secondaryRegion.getStores()) {
706      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
707    }
708    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
709
710    // Test case 1: replay the a flush commit marker larger than what we have prepared
711    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
712      + startFlushDesc);
713    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
714
715    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
716    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
717
718    // assert that the flush files are picked
719    expectedStoreFileCount++;
720    for (HStore s : secondaryRegion.getStores()) {
721      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
722    }
723    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
724    MemStoreSize mss = store.getFlushableSize();
725    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
726
727    // assert that the region memstore is smaller than before, but not empty
728    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
729    assertTrue(newRegionMemstoreSize > 0);
730    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
731
732    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
733
734    LOG.info("-- Verifying edits from secondary");
735    verifyData(secondaryRegion, 0, numRows, cq, families);
736
737    LOG.info("-- Verifying edits from primary.");
738    verifyData(primaryRegion, 0, numRows, cq, families);
739  }
740
741  /**
742   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
743   * memstore edits should be dropped after the flush commit replay since they should be in flushed
744   * files
745   */
746  @Test
747  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
748    throws IOException {
749    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
750  }
751
752  /**
753   * Tests the case where we receive a flush commit before receiving any flush prepare markers. The
754   * memstore edits should be not dropped after the flush commit replay since not every edit will be
755   * in flushed files (based on seqId)
756   */
757  @Test
758  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
759    throws IOException {
760    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
761  }
762
763  /**
764   * Tests the case where we receive a flush commit before receiving any flush prepare markers
765   */
766  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
767    throws IOException {
768    // load some data to primary and flush. 1 flushes and some more unflushed data.
769    // write more data after flush depending on whether droppableSnapshot
770    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
771    int numRows = droppableMemstore ? 100 : 200;
772
773    // now replay the edits and the flush marker
774    reader = createWALReaderForPrimary();
775
776    LOG.info("-- Replaying edits and flush events in secondary");
777    FlushDescriptor commitFlushDesc = null;
778
779    int lastReplayed = 0;
780    while (true) {
781      WAL.Entry entry = reader.next();
782      if (entry == null) {
783        break;
784      }
785      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
786      if (flushDesc != null) {
787        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
788          // do not replay flush start marker
789        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
790          commitFlushDesc = flushDesc; // hold on to the flush commit marker
791        }
792        // after replay verify that everything is still visible
793        verifyData(secondaryRegion, 0, lastReplayed + 1, cq, families);
794      } else {
795        lastReplayed = replayEdit(secondaryRegion, entry);
796      }
797    }
798
799    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
800    // and some more data in memstores (rows 100-300)
801    verifyData(secondaryRegion, 0, numRows, cq, families);
802
803    // no store files in the region
804    int expectedStoreFileCount = 0;
805    for (HStore s : secondaryRegion.getStores()) {
806      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
807    }
808    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
809
810    // Test case 1: replay a flush commit marker without start flush marker
811    assertNull(secondaryRegion.getPrepareFlushResult());
812    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
813
814    // ensure all files are visible in secondary
815    for (HStore store : secondaryRegion.getStores()) {
816      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
817    }
818
819    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
820    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
821
822    // assert that the flush files are picked
823    expectedStoreFileCount++;
824    for (HStore s : secondaryRegion.getStores()) {
825      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
826    }
827    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
828    MemStoreSize mss = store.getFlushableSize();
829    if (droppableMemstore) {
830      // assert that the memstore is dropped
831      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
832    } else {
833      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
834    }
835
836    // assert that the region memstore is same as before (we could not drop)
837    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
838    if (droppableMemstore) {
839      assertTrue(0 == newRegionMemstoreSize);
840    } else {
841      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
842    }
843
844    LOG.info("-- Verifying edits from secondary");
845    verifyData(secondaryRegion, 0, numRows, cq, families);
846
847    LOG.info("-- Verifying edits from primary.");
848    verifyData(primaryRegion, 0, numRows, cq, families);
849  }
850
851  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
852    return FlushDescriptor.newBuilder(flush).setFlushSequenceNumber(flushSeqId).build();
853  }
854
855  /**
856   * Tests replaying region open markers from primary region. Checks whether the files are picked up
857   */
858  @Test
859  public void testReplayRegionOpenEvent() throws IOException {
860    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
861    int numRows = 100;
862
863    // close the region and open again.
864    primaryRegion.close();
865    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
866
867    // now replay the edits and the flush marker
868    reader = createWALReaderForPrimary();
869    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
870
871    LOG.info("-- Replaying edits and region events in secondary");
872    while (true) {
873      WAL.Entry entry = reader.next();
874      if (entry == null) {
875        break;
876      }
877      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
878      RegionEventDescriptor regionEventDesc =
879        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
880
881      if (flushDesc != null) {
882        // don't replay flush events
883      } else if (regionEventDesc != null) {
884        regionEvents.add(regionEventDesc);
885      } else {
886        // don't replay edits
887      }
888    }
889
890    // we should have 1 open, 1 close and 1 open event
891    assertEquals(3, regionEvents.size());
892
893    // replay the first region open event.
894    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
895
896    // replay the close event as well
897    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
898
899    // no store files in the region
900    int expectedStoreFileCount = 0;
901    for (HStore s : secondaryRegion.getStores()) {
902      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
903    }
904    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
905    assertTrue(regionMemstoreSize == 0);
906
907    // now replay the region open event that should contain new file locations
908    LOG.info("Testing replaying region open event " + regionEvents.get(2));
909    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
910
911    // assert that the flush files are picked
912    expectedStoreFileCount++;
913    for (HStore s : secondaryRegion.getStores()) {
914      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
915    }
916    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
917    MemStoreSize mss = store.getFlushableSize();
918    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
919
920    // assert that the region memstore is empty
921    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
922    assertTrue(newRegionMemstoreSize == 0);
923
924    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
925                                                         // any
926
927    LOG.info("-- Verifying edits from secondary");
928    verifyData(secondaryRegion, 0, numRows, cq, families);
929
930    LOG.info("-- Verifying edits from primary.");
931    verifyData(primaryRegion, 0, numRows, cq, families);
932  }
933
934  /**
935   * Tests the case where we replay a region open event after a flush start but before receiving
936   * flush commit
937   */
938  @Test
939  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
940    putDataWithFlushes(primaryRegion, 100, 100, 100);
941    int numRows = 200;
942
943    // close the region and open again.
944    primaryRegion.close();
945    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
946
947    // now replay the edits and the flush marker
948    reader = createWALReaderForPrimary();
949    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
950
951    LOG.info("-- Replaying edits and region events in secondary");
952    while (true) {
953      WAL.Entry entry = reader.next();
954      if (entry == null) {
955        break;
956      }
957      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
958      RegionEventDescriptor regionEventDesc =
959        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
960
961      if (flushDesc != null) {
962        // only replay flush start
963        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
964          secondaryRegion.replayWALFlushStartMarker(flushDesc);
965        }
966      } else if (regionEventDesc != null) {
967        regionEvents.add(regionEventDesc);
968      } else {
969        replayEdit(secondaryRegion, entry);
970      }
971    }
972
973    // at this point, there should be some data (rows 0-100) in the memstore snapshot
974    // and some more data in memstores (rows 100-200)
975    verifyData(secondaryRegion, 0, numRows, cq, families);
976
977    // we should have 1 open, 1 close and 1 open event
978    assertEquals(3, regionEvents.size());
979
980    // no store files in the region
981    int expectedStoreFileCount = 0;
982    for (HStore s : secondaryRegion.getStores()) {
983      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
984    }
985
986    // now replay the region open event that should contain new file locations
987    LOG.info("Testing replaying region open event " + regionEvents.get(2));
988    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
989
990    // assert that the flush files are picked
991    expectedStoreFileCount = 2; // two flushes happened
992    for (HStore s : secondaryRegion.getStores()) {
993      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
994    }
995    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
996    MemStoreSize newSnapshotSize = store.getSnapshotSize();
997    assertTrue(newSnapshotSize.getDataSize() == 0);
998
999    // assert that the region memstore is empty
1000    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
1001    assertTrue(newRegionMemstoreSize == 0);
1002
1003    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped if
1004                                                         // any
1005
1006    LOG.info("-- Verifying edits from secondary");
1007    verifyData(secondaryRegion, 0, numRows, cq, families);
1008
1009    LOG.info("-- Verifying edits from primary.");
1010    verifyData(primaryRegion, 0, numRows, cq, families);
1011  }
1012
1013  /**
1014   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1015   * of the last replayed region open event.
1016   */
1017  @Test
1018  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1019    putDataWithFlushes(primaryRegion, 100, 100, 0);
1020    int numRows = 100;
1021
1022    // close the region and open again.
1023    primaryRegion.close();
1024    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1025
1026    // now replay the edits and the flush marker
1027    reader = createWALReaderForPrimary();
1028    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1029    List<WAL.Entry> edits = Lists.newArrayList();
1030
1031    LOG.info("-- Replaying edits and region events in secondary");
1032    while (true) {
1033      WAL.Entry entry = reader.next();
1034      if (entry == null) {
1035        break;
1036      }
1037      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1038      RegionEventDescriptor regionEventDesc =
1039        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1040
1041      if (flushDesc != null) {
1042        // don't replay flushes
1043      } else if (regionEventDesc != null) {
1044        regionEvents.add(regionEventDesc);
1045      } else {
1046        edits.add(entry);
1047      }
1048    }
1049
1050    // replay the region open of first open, but with the seqid of the second open
1051    // this way non of the flush files will be picked up.
1052    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder(regionEvents.get(0))
1053      .setLogSequenceNumber(regionEvents.get(2).getLogSequenceNumber()).build());
1054
1055    // replay edits from the before region close. If replay does not
1056    // skip these the following verification will NOT fail.
1057    for (WAL.Entry entry : edits) {
1058      replayEdit(secondaryRegion, entry);
1059    }
1060
1061    boolean expectedFail = false;
1062    try {
1063      verifyData(secondaryRegion, 0, numRows, cq, families);
1064    } catch (AssertionError e) {
1065      expectedFail = true; // expected
1066    }
1067    if (!expectedFail) {
1068      fail("Should have failed this verification");
1069    }
1070  }
1071
1072  @Test
1073  public void testReplayFlushSeqIds() throws IOException {
1074    // load some data to primary and flush
1075    int start = 0;
1076    LOG.info("-- Writing some data to primary from " + start + " to " + (start + 100));
1077    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1078    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1079    primaryRegion.flush(true);
1080
1081    // now replay the flush marker
1082    reader = createWALReaderForPrimary();
1083
1084    long flushSeqId = -1;
1085    LOG.info("-- Replaying flush events in secondary");
1086    while (true) {
1087      WAL.Entry entry = reader.next();
1088      if (entry == null) {
1089        break;
1090      }
1091      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1092      if (flushDesc != null) {
1093        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1094          LOG.info("-- Replaying flush start in secondary");
1095          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1096          flushSeqId = flushDesc.getFlushSequenceNumber();
1097        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1098          LOG.info("-- Replaying flush commit in secondary");
1099          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1100          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1101        }
1102      }
1103      // else do not replay
1104    }
1105
1106    // TODO: what to do with this?
1107    // assert that the newly picked up flush file is visible
1108    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1109    assertEquals(flushSeqId, readPoint);
1110
1111    // after replay verify that everything is still visible
1112    verifyData(secondaryRegion, 0, 100, cq, families);
1113  }
1114
1115  @Test
1116  public void testSeqIdsFromReplay() throws IOException {
1117    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1118    // original seqIds and they are made visible through mvcc read point upon replay
1119    String method = name.getMethodName();
1120    byte[] tableName = Bytes.toBytes(method);
1121    byte[] family = Bytes.toBytes("family");
1122
1123    HRegion region = initHRegion(tableName, family);
1124    try {
1125      // replay an entry that is bigger than current read point
1126      long readPoint = region.getMVCC().getReadPoint();
1127      long origSeqId = readPoint + 100;
1128
1129      Put put = new Put(row).addColumn(family, row, row);
1130      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1131      replay(region, put, origSeqId);
1132
1133      // read point should have advanced to this seqId
1134      assertGet(region, family, row);
1135
1136      // region seqId should have advanced at least to this seqId
1137      assertEquals(origSeqId, region.getReadPoint(null));
1138
1139      // replay an entry that is smaller than current read point
1140      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1141      // replay does not allow reads while replay is going on.
1142      put = new Put(row2).addColumn(family, row2, row2);
1143      put.setDurability(Durability.SKIP_WAL);
1144      replay(region, put, origSeqId - 50);
1145
1146      assertGet(region, family, row2);
1147    } finally {
1148      region.close();
1149    }
1150  }
1151
1152  /**
1153   * Tests that a region opened in secondary mode would not write region open / close events to its
1154   * WAL.
1155   */
1156  @Test
1157  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1158    secondaryRegion.close();
1159    walSecondary = spy(walSecondary);
1160
1161    // test for region open and close
1162    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1163    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1164      any(WALEdit.class));
1165
1166    // test for replay prepare flush
1167    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1168    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder()
1169      .setFlushSequenceNumber(10)
1170      .setTableName(UnsafeByteOperations
1171        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1172      .setAction(FlushAction.START_FLUSH)
1173      .setEncodedRegionName(
1174        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1175      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1176      .build());
1177
1178    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1179      any(WALEdit.class));
1180
1181    secondaryRegion.close();
1182    verify(walSecondary, times(0)).appendData(any(RegionInfo.class), any(WALKeyImpl.class),
1183      any(WALEdit.class));
1184  }
1185
1186  /**
1187   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1188   */
1189  @Test
1190  public void testRegionReadsEnabledFlag() throws IOException {
1191
1192    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1193
1194    verifyData(secondaryRegion, 0, 100, cq, families);
1195
1196    // now disable reads
1197    secondaryRegion.setReadsEnabled(false);
1198    try {
1199      verifyData(secondaryRegion, 0, 100, cq, families);
1200      fail("Should have failed with IOException");
1201    } catch (IOException ex) {
1202      // expected
1203    }
1204
1205    // verify that we can still replay data
1206    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1207
1208    // now enable reads again
1209    secondaryRegion.setReadsEnabled(true);
1210    verifyData(secondaryRegion, 0, 200, cq, families);
1211  }
1212
1213  /**
1214   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1215   * It should write the flush request marker instead.
1216   */
1217  @Test
1218  public void testWriteFlushRequestMarker() throws IOException {
1219    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1220    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1221    assertNotNull(result);
1222    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1223    assertFalse(result.wroteFlushWalMarker);
1224
1225    // request flush again, but this time with writeFlushRequestWalMarker = true
1226    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1227    assertNotNull(result);
1228    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1229    assertTrue(result.wroteFlushWalMarker);
1230
1231    List<FlushDescriptor> flushes = Lists.newArrayList();
1232    reader = createWALReaderForPrimary();
1233    while (true) {
1234      WAL.Entry entry = reader.next();
1235      if (entry == null) {
1236        break;
1237      }
1238      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1239      if (flush != null) {
1240        flushes.add(flush);
1241      }
1242    }
1243
1244    assertEquals(1, flushes.size());
1245    assertNotNull(flushes.get(0));
1246    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1247  }
1248
1249  /**
1250   * Test the case where the secondary region replica is not in reads enabled state because it is
1251   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH flush
1252   * marker entry should restore the reads enabled status in the region and allow the reads to
1253   * continue.
1254   */
1255  @Test
1256  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1257    disableReads(secondaryRegion);
1258
1259    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1260    // triggered flush restores readsEnabled
1261    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1262    reader = createWALReaderForPrimary();
1263    while (true) {
1264      WAL.Entry entry = reader.next();
1265      if (entry == null) {
1266        break;
1267      }
1268      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1269      if (flush != null) {
1270        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1271      }
1272    }
1273
1274    // now reads should be enabled
1275    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1276  }
1277
1278  /**
1279   * Test the case where the secondary region replica is not in reads enabled state because it is
1280   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1281   * entries should restore the reads enabled status in the region and allow the reads to continue.
1282   */
1283  @Test
1284  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1285    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1286    // from triggered flush restores readsEnabled
1287    disableReads(secondaryRegion);
1288
1289    // put some data in primary
1290    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1291    primaryRegion.flush(true);
1292    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1293    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1294    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1295    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1296    // but can't figure it... and this is only test that seems to suffer this flush issue.
1297    // St.Ack 20160201
1298    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1299
1300    reader = createWALReaderForPrimary();
1301    while (true) {
1302      WAL.Entry entry = reader.next();
1303      LOG.info(Objects.toString(entry));
1304      if (entry == null) {
1305        break;
1306      }
1307      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1308      if (flush != null) {
1309        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1310      } else {
1311        replayEdit(secondaryRegion, entry);
1312      }
1313    }
1314
1315    // now reads should be enabled
1316    verifyData(secondaryRegion, 0, 100, cq, families);
1317  }
1318
1319  /**
1320   * Test the case where the secondary region replica is not in reads enabled state because it is
1321   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1322   * entries should restore the reads enabled status in the region and allow the reads to continue.
1323   */
1324  @Test
1325  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1326    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1327    // from triggered flush restores readsEnabled
1328    disableReads(secondaryRegion);
1329
1330    // put some data in primary
1331    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1332    primaryRegion.flush(true);
1333
1334    reader = createWALReaderForPrimary();
1335    while (true) {
1336      WAL.Entry entry = reader.next();
1337      if (entry == null) {
1338        break;
1339      }
1340      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1341      if (flush != null) {
1342        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1343      }
1344    }
1345
1346    // now reads should be enabled
1347    verifyData(secondaryRegion, 0, 100, cq, families);
1348  }
1349
1350  /**
1351   * Test the case where the secondary region replica is not in reads enabled state because it is
1352   * waiting for a flush or region open marker from primary region. Replaying region open event
1353   * entry from primary should restore the reads enabled status in the region and allow the reads to
1354   * continue.
1355   */
1356  @Test
1357  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1358    // Test case 3: Test that replaying region open event markers restores readsEnabled
1359    disableReads(secondaryRegion);
1360
1361    primaryRegion.close();
1362    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1363
1364    reader = createWALReaderForPrimary();
1365    while (true) {
1366      WAL.Entry entry = reader.next();
1367      if (entry == null) {
1368        break;
1369      }
1370
1371      RegionEventDescriptor regionEventDesc =
1372        WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1373
1374      if (regionEventDesc != null) {
1375        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1376      }
1377    }
1378
1379    // now reads should be enabled
1380    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1381  }
1382
1383  @Test
1384  public void testRefresStoreFiles() throws IOException {
1385    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1386    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1387
1388    // Test case 1: refresh with an empty region
1389    secondaryRegion.refreshStoreFiles();
1390    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1391
1392    // do one flush
1393    putDataWithFlushes(primaryRegion, 100, 100, 0);
1394    int numRows = 100;
1395
1396    // refresh the store file list, and ensure that the files are picked up.
1397    secondaryRegion.refreshStoreFiles();
1398    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1399      secondaryRegion.getStoreFileList(families));
1400    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1401
1402    LOG.info("-- Verifying edits from secondary");
1403    verifyData(secondaryRegion, 0, numRows, cq, families);
1404
1405    // Test case 2: 3 some more flushes
1406    putDataWithFlushes(primaryRegion, 100, 300, 0);
1407    numRows = 300;
1408
1409    // refresh the store file list, and ensure that the files are picked up.
1410    secondaryRegion.refreshStoreFiles();
1411    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1412      secondaryRegion.getStoreFileList(families));
1413    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1414
1415    LOG.info("-- Verifying edits from secondary");
1416    verifyData(secondaryRegion, 0, numRows, cq, families);
1417
1418    if (FSUtils.WINDOWS) {
1419      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1420      return;
1421    }
1422
1423    // Test case 3: compact primary files
1424    primaryRegion.compactStores();
1425    List<HRegion> regions = new ArrayList<>();
1426    regions.add(primaryRegion);
1427    Mockito.doReturn(regions).when(rss).getRegions();
1428    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1429    cleaner.chore();
1430    secondaryRegion.refreshStoreFiles();
1431    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1432      secondaryRegion.getStoreFileList(families));
1433    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1434
1435    LOG.info("-- Verifying edits from secondary");
1436    verifyData(secondaryRegion, 0, numRows, cq, families);
1437
1438    LOG.info("-- Replaying edits in secondary");
1439
1440    // Test case 4: replay some edits, ensure that memstore is dropped.
1441    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1442    putDataWithFlushes(primaryRegion, 400, 400, 0);
1443    numRows = 400;
1444
1445    reader = createWALReaderForPrimary();
1446    while (true) {
1447      WAL.Entry entry = reader.next();
1448      if (entry == null) {
1449        break;
1450      }
1451      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1452      if (flush != null) {
1453        // do not replay flush
1454      } else {
1455        replayEdit(secondaryRegion, entry);
1456      }
1457    }
1458
1459    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1460
1461    secondaryRegion.refreshStoreFiles();
1462
1463    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1464
1465    LOG.info("-- Verifying edits from primary");
1466    verifyData(primaryRegion, 0, numRows, cq, families);
1467    LOG.info("-- Verifying edits from secondary");
1468    verifyData(secondaryRegion, 0, numRows, cq, families);
1469  }
1470
1471  /**
1472   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1473   */
1474  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1475    List<Path> l1 = new ArrayList<>(list1.size());
1476    for (String path : list1) {
1477      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1478    }
1479    List<Path> l2 = new ArrayList<>(list2.size());
1480    for (String path : list2) {
1481      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1482    }
1483    assertEquals(l1, l2);
1484  }
1485
1486  private void disableReads(HRegion region) {
1487    region.setReadsEnabled(false);
1488    try {
1489      verifyData(region, 0, 1, cq, families);
1490      fail("Should have failed with IOException");
1491    } catch (IOException ex) {
1492      // expected
1493    }
1494  }
1495
1496  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1497    put.setDurability(Durability.SKIP_WAL);
1498    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1499    region.batchReplay(new MutationReplay[] { mutation }, replaySeqId);
1500  }
1501
1502  /**
1503   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1504   */
1505  @Test
1506  public void testReplayBulkLoadEvent() throws IOException {
1507    LOG.info("testReplayBulkLoadEvent starts");
1508    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1509
1510    // close the region and open again.
1511    primaryRegion.close();
1512    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1513
1514    // bulk load a file into primary region
1515    byte[] randomValues = new byte[20];
1516    Bytes.random(randomValues);
1517    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1518
1519    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1520    int expectedLoadFileCount = 0;
1521    for (byte[] family : families) {
1522      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1523      expectedLoadFileCount++;
1524    }
1525    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1526
1527    // now replay the edits and the bulk load marker
1528    reader = createWALReaderForPrimary();
1529
1530    LOG.info("-- Replaying edits and region events in secondary");
1531    BulkLoadDescriptor bulkloadEvent = null;
1532    while (true) {
1533      WAL.Entry entry = reader.next();
1534      if (entry == null) {
1535        break;
1536      }
1537      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1538      if (bulkloadEvent != null) {
1539        break;
1540      }
1541    }
1542
1543    // we should have 1 bulk load event
1544    assertTrue(bulkloadEvent != null);
1545    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1546
1547    // replay the bulk load event
1548    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1549
1550    List<String> storeFileNames = new ArrayList<>();
1551    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1552      storeFileNames.addAll(storeDesc.getStoreFileList());
1553    }
1554    // assert that the bulk loaded files are picked
1555    for (HStore s : secondaryRegion.getStores()) {
1556      for (HStoreFile sf : s.getStorefiles()) {
1557        storeFileNames.remove(sf.getPath().getName());
1558      }
1559    }
1560    assertTrue("Found some store file isn't loaded:" + storeFileNames, storeFileNames.isEmpty());
1561
1562    LOG.info("-- Verifying edits from secondary");
1563    for (byte[] family : families) {
1564      assertGet(secondaryRegion, family, randomValues);
1565    }
1566  }
1567
1568  @Test
1569  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1570    // tests replaying flush commit marker, but the flush file has already been compacted
1571    // from primary and also deleted from the archive directory
1572    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder()
1573      .setFlushSequenceNumber(Long.MAX_VALUE)
1574      .setTableName(UnsafeByteOperations
1575        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1576      .setAction(FlushAction.COMMIT_FLUSH)
1577      .setEncodedRegionName(
1578        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1579      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1580      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1581        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1582        .setStoreHomeDir("/store_home_dir").addFlushOutput("/foo/baz/123").build())
1583      .build());
1584  }
1585
1586  @Test
1587  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1588    // tests replaying compaction marker, but the compaction output file has already been compacted
1589    // from primary and also deleted from the archive directory
1590    secondaryRegion
1591      .replayWALCompactionMarker(
1592        CompactionDescriptor.newBuilder()
1593          .setTableName(UnsafeByteOperations
1594            .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1595          .setEncodedRegionName(
1596            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1597          .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0])).addCompactionInput("/123")
1598          .addCompactionOutput("/456").setStoreHomeDir("/store_home_dir")
1599          .setRegionName(
1600            UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1601          .build(),
1602        true, true, Long.MAX_VALUE);
1603  }
1604
1605  @Test
1606  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1607    // tests replaying region open event marker, but the region files have already been compacted
1608    // from primary and also deleted from the archive directory
1609    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1610      .setTableName(UnsafeByteOperations
1611        .unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1612      .setEncodedRegionName(
1613        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1614      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1615      .setEventType(EventType.REGION_OPEN)
1616      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1617      .setLogSequenceNumber(Long.MAX_VALUE)
1618      .addStores(
1619        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1620          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1621      .build());
1622  }
1623
1624  @Test
1625  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1626    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1627    // from primary and also deleted from the archive directory
1628    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1629      .setTableName(
1630        ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1631      .setEncodedRegionName(
1632        UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1633      .setBulkloadSeqNum(Long.MAX_VALUE)
1634      .addStores(
1635        StoreDescriptor.newBuilder().setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1636          .setStoreHomeDir("/store_home_dir").addStoreFile("/123").build())
1637      .build());
1638  }
1639
1640  private String createHFileForFamilies(Path testPath, byte[] family, byte[] valueBytes)
1641    throws IOException {
1642    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1643    // TODO We need a way to do this without creating files
1644    Path testFile = new Path(testPath, TEST_UTIL.getRandomUUID().toString());
1645    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1646    try {
1647      hFileFactory.withOutputStream(out);
1648      hFileFactory.withFileContext(new HFileContextBuilder().build());
1649      HFile.Writer writer = hFileFactory.create();
1650      try {
1651        writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
1652          .setRow(valueBytes).setFamily(family).setQualifier(valueBytes).setTimestamp(0L)
1653          .setType(KeyValue.Type.Put.getCode()).setValue(valueBytes).build()));
1654      } finally {
1655        writer.close();
1656      }
1657    } finally {
1658      out.close();
1659    }
1660    return testFile.toString();
1661  }
1662
1663  /**
1664   * Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does a flush
1665   * every flushInterval number of records. Then it puts numRowsAfterFlush number of more rows but
1666   * does not execute flush after
1667   */
1668  private void putDataWithFlushes(HRegion region, int flushInterval, int numRows,
1669    int numRowsAfterFlush) throws IOException {
1670    int start = 0;
1671    for (; start < numRows; start += flushInterval) {
1672      LOG.info("-- Writing some data to primary from " + start + " to " + (start + flushInterval));
1673      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1674      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1675      region.flush(true);
1676    }
1677    LOG.info("-- Writing some more data to primary, not flushing");
1678    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1679  }
1680
1681  private void putDataByReplay(HRegion region, int startRow, int numRows, byte[] qf,
1682    byte[]... families) throws IOException {
1683    for (int i = startRow; i < startRow + numRows; i++) {
1684      Put put = new Put(Bytes.toBytes("" + i));
1685      put.setDurability(Durability.SKIP_WAL);
1686      for (byte[] family : families) {
1687        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1688      }
1689      replay(region, put, i + 1);
1690    }
1691  }
1692
1693  private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException {
1694    return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW,
1695      HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families);
1696  }
1697}