001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestHRegion.assertGet;
021import static org.apache.hadoop.hbase.regionserver.TestHRegion.putData;
022import static org.apache.hadoop.hbase.regionserver.TestHRegion.verifyData;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029import static org.mockito.ArgumentMatchers.any;
030import static org.mockito.ArgumentMatchers.anyBoolean;
031import static org.mockito.Mockito.mock;
032import static org.mockito.Mockito.spy;
033import static org.mockito.Mockito.times;
034import static org.mockito.Mockito.verify;
035import static org.mockito.Mockito.when;
036
037import java.io.FileNotFoundException;
038import java.io.IOException;
039import java.util.ArrayList;
040import java.util.List;
041import java.util.Map;
042import java.util.Objects;
043import java.util.Random;
044import java.util.UUID;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.HBaseClassTestRule;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.KeyValue;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.Durability;
058import org.apache.hadoop.hbase.client.Get;
059import org.apache.hadoop.hbase.client.Put;
060import org.apache.hadoop.hbase.client.RegionInfo;
061import org.apache.hadoop.hbase.client.RegionInfoBuilder;
062import org.apache.hadoop.hbase.client.TableDescriptor;
063import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
064import org.apache.hadoop.hbase.executor.ExecutorService;
065import org.apache.hadoop.hbase.io.hfile.HFile;
066import org.apache.hadoop.hbase.io.hfile.HFileContext;
067import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
068import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
069import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
070import org.apache.hadoop.hbase.testclassification.MediumTests;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
074import org.apache.hadoop.hbase.util.FSUtils;
075import org.apache.hadoop.hbase.util.Pair;
076import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
077import org.apache.hadoop.hbase.wal.WAL;
078import org.apache.hadoop.hbase.wal.WALEdit;
079import org.apache.hadoop.hbase.wal.WALFactory;
080import org.apache.hadoop.hbase.wal.WALKeyImpl;
081import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
082import org.apache.hadoop.util.StringUtils;
083import org.junit.After;
084import org.junit.Before;
085import org.junit.ClassRule;
086import org.junit.Rule;
087import org.junit.Test;
088import org.junit.experimental.categories.Category;
089import org.junit.rules.TestName;
090import org.mockito.Mockito;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
095import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
096
097import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
107
108/**
109 * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
110 * region replicas
111 */
112@Category(MediumTests.class)
113public class TestHRegionReplayEvents {
114
115  @ClassRule
116  public static final HBaseClassTestRule CLASS_RULE =
117      HBaseClassTestRule.forClass(TestHRegionReplayEvents.class);
118
119  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
120  @Rule public TestName name = new TestName();
121
122  private static HBaseTestingUtility TEST_UTIL;
123
124  public static Configuration CONF ;
125  private String dir;
126
127  private byte[][] families = new byte[][] {
128      Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
129
130  // Test names
131  protected byte[] tableName;
132  protected String method;
133  protected final byte[] row = Bytes.toBytes("rowA");
134  protected final byte[] row2 = Bytes.toBytes("rowB");
135  protected byte[] cq = Bytes.toBytes("cq");
136
137  // per test fields
138  private Path rootDir;
139  private TableDescriptor htd;
140  private long time;
141  private RegionServerServices rss;
142  private RegionInfo primaryHri, secondaryHri;
143  private HRegion primaryRegion, secondaryRegion;
144  private WALFactory wals;
145  private WAL walPrimary, walSecondary;
146  private WAL.Reader reader;
147
148  @Before
149  public void setup() throws IOException {
150    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
151    CONF = TEST_UTIL.getConfiguration();
152    dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
153    method = name.getMethodName();
154    tableName = Bytes.toBytes(name.getMethodName());
155    rootDir = new Path(dir + method);
156    TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
157    method = name.getMethodName();
158    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(method));
159    for (byte[] family : families) {
160      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
161    }
162    htd = builder.build();
163
164    time = System.currentTimeMillis();
165    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
166    primaryHri =
167        RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(0).build();
168    secondaryHri =
169        RegionInfoBuilder.newBuilder(htd.getTableName()).setRegionId(time).setReplicaId(1).build();
170
171    wals = TestHRegion.createWALFactory(CONF, rootDir);
172    walPrimary = wals.getWAL(primaryHri);
173    walSecondary = wals.getWAL(secondaryHri);
174
175    rss = mock(RegionServerServices.class);
176    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
177    when(rss.getConfiguration()).thenReturn(CONF);
178    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
179    String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
180        .toString();
181    ExecutorService es = new ExecutorService(string);
182    es.startExecutorService(
183      string+"-"+string, 1);
184    when(rss.getExecutorService()).thenReturn(es);
185    primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
186    primaryRegion.close();
187    List<HRegion> regions = new ArrayList<>();
188    regions.add(primaryRegion);
189    Mockito.doReturn(regions).when(rss).getRegions();
190
191    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
192    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
193
194    reader = null;
195  }
196
197  @After
198  public void tearDown() throws Exception {
199    if (reader != null) {
200      reader.close();
201    }
202
203    if (primaryRegion != null) {
204      HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
205    }
206    if (secondaryRegion != null) {
207      HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
208    }
209
210    EnvironmentEdgeManagerTestHelper.reset();
211    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
212    TEST_UTIL.cleanupTestDir();
213  }
214
215  String getName() {
216    return name.getMethodName();
217  }
218
219  // Some of the test cases are as follows:
220  // 1. replay flush start marker again
221  // 2. replay flush with smaller seqId than what is there in memstore snapshot
222  // 3. replay flush with larger seqId than what is there in memstore snapshot
223  // 4. replay flush commit without flush prepare. non droppable memstore
224  // 5. replay flush commit without flush prepare. droppable memstore
225  // 6. replay open region event
226  // 7. replay open region event after flush start
227  // 8. replay flush form an earlier seqId (test ignoring seqIds)
228  // 9. start flush does not prevent region from closing.
229
230  @Test
231  public void testRegionReplicaSecondaryCannotFlush() throws IOException {
232    // load some data and flush ensure that the secondary replica will not execute the flush
233
234    // load some data to secondary by replaying
235    putDataByReplay(secondaryRegion, 0, 1000, cq, families);
236
237    verifyData(secondaryRegion, 0, 1000, cq, families);
238
239    // flush region
240    FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
241    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH, flush.result);
242
243    verifyData(secondaryRegion, 0, 1000, cq, families);
244
245    // close the region, and inspect that it has not flushed
246    Map<byte[], List<HStoreFile>> files = secondaryRegion.close(false);
247    // assert that there are no files (due to flush)
248    for (List<HStoreFile> f : files.values()) {
249      assertTrue(f.isEmpty());
250    }
251  }
252
253  /**
254   * Tests a case where we replay only a flush start marker, then the region is closed. This region
255   * should not block indefinitely
256   */
257  @Test
258  public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
259    // load some data to primary and flush
260    int start = 0;
261    LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
262    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
263    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
264    primaryRegion.flush(true);
265
266    // now replay the edits and the flush marker
267    reader = createWALReaderForPrimary();
268
269    LOG.info("-- Replaying edits and flush events in secondary");
270    while (true) {
271      WAL.Entry entry = reader.next();
272      if (entry == null) {
273        break;
274      }
275      FlushDescriptor flushDesc
276        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
277      if (flushDesc != null) {
278        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
279          LOG.info("-- Replaying flush start in secondary");
280          secondaryRegion.replayWALFlushStartMarker(flushDesc);
281        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
282          LOG.info("-- NOT Replaying flush commit in secondary");
283        }
284      } else {
285        replayEdit(secondaryRegion, entry);
286      }
287    }
288
289    assertTrue(rss.getRegionServerAccounting().getGlobalMemStoreDataSize() > 0);
290    // now close the region which should not cause hold because of un-committed flush
291    secondaryRegion.close();
292
293    // verify that the memstore size is back to what it was
294    assertEquals(0, rss.getRegionServerAccounting().getGlobalMemStoreDataSize());
295  }
296
297  static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
298    if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
299      return 0; // handled elsewhere
300    }
301    Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
302    for (Cell cell : entry.getEdit().getCells()) put.add(cell);
303    put.setDurability(Durability.SKIP_WAL);
304    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
305    region.batchReplay(new MutationReplay[] {mutation},
306      entry.getKey().getSequenceId());
307    return Integer.parseInt(Bytes.toString(put.getRow()));
308  }
309
310  WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
311    return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
312      AbstractFSWALProvider.getCurrentFileName(walPrimary),
313      TEST_UTIL.getConfiguration());
314  }
315
316  @Test
317  public void testBatchReplayWithMultipleNonces() throws IOException {
318    try {
319      MutationReplay[] mutations = new MutationReplay[100];
320      for (int i = 0; i < 100; i++) {
321        Put put = new Put(Bytes.toBytes(i));
322        put.setDurability(Durability.SYNC_WAL);
323        for (byte[] familly : this.families) {
324          put.addColumn(familly, this.cq, null);
325          long nonceNum = i / 10;
326          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, nonceNum);
327        }
328      }
329      primaryRegion.batchReplay(mutations, 20);
330    } catch (Exception e) {
331      String msg = "Error while replay of batch with multiple nonces. ";
332      LOG.error(msg, e);
333      fail(msg + e.getMessage());
334    }
335  }
336
337  @Test
338  public void testReplayFlushesAndCompactions() throws IOException {
339    // initiate a secondary region with some data.
340
341    // load some data to primary and flush. 3 flushes and some more unflushed data
342    putDataWithFlushes(primaryRegion, 100, 300, 100);
343
344    // compaction from primary
345    LOG.info("-- Compacting primary, only 1 store");
346    primaryRegion.compactStore(Bytes.toBytes("cf1"),
347      NoLimitThroughputController.INSTANCE);
348
349    // now replay the edits and the flush marker
350    reader = createWALReaderForPrimary();
351
352    LOG.info("-- Replaying edits and flush events in secondary");
353    int lastReplayed = 0;
354    int expectedStoreFileCount = 0;
355    while (true) {
356      WAL.Entry entry = reader.next();
357      if (entry == null) {
358        break;
359      }
360      FlushDescriptor flushDesc
361      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
362      CompactionDescriptor compactionDesc
363      = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
364      if (flushDesc != null) {
365        // first verify that everything is replayed and visible before flush event replay
366        verifyData(secondaryRegion, 0, lastReplayed, cq, families);
367        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
368        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
369        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
370        MemStoreSize mss = store.getFlushableSize();
371        long storeSize = store.getSize();
372        long storeSizeUncompressed = store.getStoreSizeUncompressed();
373        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
374          LOG.info("-- Replaying flush start in secondary");
375          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
376          assertNull(result.result);
377          assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
378
379          // assert that the store memstore is smaller now
380          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
381          LOG.info("Memstore size reduced by:"
382              + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
383          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
384
385        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
386          LOG.info("-- Replaying flush commit in secondary");
387          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
388
389          // assert that the flush files are picked
390          expectedStoreFileCount++;
391          for (HStore s : secondaryRegion.getStores()) {
392            assertEquals(expectedStoreFileCount, s.getStorefilesCount());
393          }
394          MemStoreSize newMss = store.getFlushableSize();
395          assertTrue(mss.getHeapSize() > newMss.getHeapSize());
396
397          // assert that the region memstore is smaller now
398          long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
399          assertTrue(regionMemstoreSize > newRegionMemstoreSize);
400
401          // assert that the store sizes are bigger
402          assertTrue(store.getSize() > storeSize);
403          assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
404          assertEquals(store.getSize(), store.getStorefilesSize());
405        }
406        // after replay verify that everything is still visible
407        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
408      } else if (compactionDesc != null) {
409        secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
410
411        // assert that the compaction is applied
412        for (HStore store : secondaryRegion.getStores()) {
413          if (store.getColumnFamilyName().equals("cf1")) {
414            assertEquals(1, store.getStorefilesCount());
415          } else {
416            assertEquals(expectedStoreFileCount, store.getStorefilesCount());
417          }
418        }
419      } else {
420        lastReplayed = replayEdit(secondaryRegion, entry);;
421      }
422    }
423
424    assertEquals(400-1, lastReplayed);
425    LOG.info("-- Verifying edits from secondary");
426    verifyData(secondaryRegion, 0, 400, cq, families);
427
428    LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
429    verifyData(primaryRegion, 0, lastReplayed, cq, families);
430    for (HStore store : primaryRegion.getStores()) {
431      if (store.getColumnFamilyName().equals("cf1")) {
432        assertEquals(1, store.getStorefilesCount());
433      } else {
434        assertEquals(expectedStoreFileCount, store.getStorefilesCount());
435      }
436    }
437  }
438
439  /**
440   * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
441   * equal to, greater or less than the previous flush start marker.
442   */
443  @Test
444  public void testReplayFlushStartMarkers() throws IOException {
445    // load some data to primary and flush. 1 flush and some more unflushed data
446    putDataWithFlushes(primaryRegion, 100, 100, 100);
447    int numRows = 200;
448
449    // now replay the edits and the flush marker
450    reader =  createWALReaderForPrimary();
451
452    LOG.info("-- Replaying edits and flush events in secondary");
453
454    FlushDescriptor startFlushDesc = null;
455
456    int lastReplayed = 0;
457    while (true) {
458      WAL.Entry entry = reader.next();
459      if (entry == null) {
460        break;
461      }
462      FlushDescriptor flushDesc
463      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
464      if (flushDesc != null) {
465        // first verify that everything is replayed and visible before flush event replay
466        HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
467        long storeMemstoreSize = store.getMemStoreSize().getHeapSize();
468        long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
469        MemStoreSize mss = store.getFlushableSize();
470
471        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
472          startFlushDesc = flushDesc;
473          LOG.info("-- Replaying flush start in secondary");
474          PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
475          assertNull(result.result);
476          assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
477          assertTrue(regionMemstoreSize > 0);
478          assertTrue(mss.getHeapSize() > 0);
479
480          // assert that the store memstore is smaller now
481          long newStoreMemstoreSize = store.getMemStoreSize().getHeapSize();
482          LOG.info("Memstore size reduced by:"
483              + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
484          assertTrue(storeMemstoreSize > newStoreMemstoreSize);
485          verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
486
487        }
488        // after replay verify that everything is still visible
489        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
490      } else {
491        lastReplayed = replayEdit(secondaryRegion, entry);
492      }
493    }
494
495    // at this point, there should be some data (rows 0-100) in memstore snapshot
496    // and some more data in memstores (rows 100-200)
497
498    verifyData(secondaryRegion, 0, numRows, cq, families);
499
500    // Test case 1: replay the same flush start marker again
501    LOG.info("-- Replaying same flush start in secondary again");
502    PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
503    assertNull(result); // this should return null. Ignoring the flush start marker
504    // assert that we still have prepared flush with the previous setup.
505    assertNotNull(secondaryRegion.getPrepareFlushResult());
506    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
507      startFlushDesc.getFlushSequenceNumber());
508    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
509    verifyData(secondaryRegion, 0, numRows, cq, families);
510
511    // Test case 2: replay a flush start marker with a smaller seqId
512    FlushDescriptor startFlushDescSmallerSeqId
513      = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
514    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
515    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
516    assertNull(result); // this should return null. Ignoring the flush start marker
517    // assert that we still have prepared flush with the previous setup.
518    assertNotNull(secondaryRegion.getPrepareFlushResult());
519    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
520      startFlushDesc.getFlushSequenceNumber());
521    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
522    verifyData(secondaryRegion, 0, numRows, cq, families);
523
524    // Test case 3: replay a flush start marker with a larger seqId
525    FlushDescriptor startFlushDescLargerSeqId
526      = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
527    LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
528    result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
529    assertNull(result); // this should return null. Ignoring the flush start marker
530    // assert that we still have prepared flush with the previous setup.
531    assertNotNull(secondaryRegion.getPrepareFlushResult());
532    assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
533      startFlushDesc.getFlushSequenceNumber());
534    assertTrue(secondaryRegion.getMemStoreDataSize() > 0); // memstore is not empty
535    verifyData(secondaryRegion, 0, numRows, cq, families);
536
537    LOG.info("-- Verifying edits from secondary");
538    verifyData(secondaryRegion, 0, numRows, cq, families);
539
540    LOG.info("-- Verifying edits from primary.");
541    verifyData(primaryRegion, 0, numRows, cq, families);
542  }
543
544  /**
545   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
546   * less than the previous flush start marker.
547   */
548  @Test
549  public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
550    // load some data to primary and flush. 2 flushes and some more unflushed data
551    putDataWithFlushes(primaryRegion, 100, 200, 100);
552    int numRows = 300;
553
554    // now replay the edits and the flush marker
555    reader =  createWALReaderForPrimary();
556
557    LOG.info("-- Replaying edits and flush events in secondary");
558    FlushDescriptor startFlushDesc = null;
559    FlushDescriptor commitFlushDesc = null;
560
561    int lastReplayed = 0;
562    while (true) {
563      System.out.println(lastReplayed);
564      WAL.Entry entry = reader.next();
565      if (entry == null) {
566        break;
567      }
568      FlushDescriptor flushDesc
569      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
570      if (flushDesc != null) {
571        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
572          // don't replay the first flush start marker, hold on to it, replay the second one
573          if (startFlushDesc == null) {
574            startFlushDesc = flushDesc;
575          } else {
576            LOG.info("-- Replaying flush start in secondary");
577            startFlushDesc = flushDesc;
578            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
579            assertNull(result.result);
580          }
581        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
582          // do not replay any flush commit yet
583          if (commitFlushDesc == null) {
584            commitFlushDesc = flushDesc; // hold on to the first flush commit marker
585          }
586        }
587        // after replay verify that everything is still visible
588        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
589      } else {
590        lastReplayed = replayEdit(secondaryRegion, entry);
591      }
592    }
593
594    // at this point, there should be some data (rows 0-200) in memstore snapshot
595    // and some more data in memstores (rows 200-300)
596    verifyData(secondaryRegion, 0, numRows, cq, families);
597
598    // no store files in the region
599    int expectedStoreFileCount = 0;
600    for (HStore s : secondaryRegion.getStores()) {
601      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
602    }
603    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
604
605    // Test case 1: replay the a flush commit marker smaller than what we have prepared
606    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
607        + startFlushDesc);
608    assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
609
610    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
611    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
612
613    // assert that the flush files are picked
614    expectedStoreFileCount++;
615    for (HStore s : secondaryRegion.getStores()) {
616      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
617    }
618    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
619    MemStoreSize mss = store.getFlushableSize();
620    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
621
622    // assert that the region memstore is same as before
623    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
624    assertEquals(regionMemstoreSize, newRegionMemstoreSize);
625
626    assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
627
628    LOG.info("-- Verifying edits from secondary");
629    verifyData(secondaryRegion, 0, numRows, cq, families);
630
631    LOG.info("-- Verifying edits from primary.");
632    verifyData(primaryRegion, 0, numRows, cq, families);
633  }
634
635  /**
636   * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
637   * larger than the previous flush start marker.
638   */
639  @Test
640  public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
641    // load some data to primary and flush. 1 flush and some more unflushed data
642    putDataWithFlushes(primaryRegion, 100, 100, 100);
643    int numRows = 200;
644
645    // now replay the edits and the flush marker
646    reader =  createWALReaderForPrimary();
647
648    LOG.info("-- Replaying edits and flush events in secondary");
649    FlushDescriptor startFlushDesc = null;
650    FlushDescriptor commitFlushDesc = null;
651
652    int lastReplayed = 0;
653    while (true) {
654      WAL.Entry entry = reader.next();
655      if (entry == null) {
656        break;
657      }
658      FlushDescriptor flushDesc
659      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
660      if (flushDesc != null) {
661        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
662          if (startFlushDesc == null) {
663            LOG.info("-- Replaying flush start in secondary");
664            startFlushDesc = flushDesc;
665            PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
666            assertNull(result.result);
667          }
668        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
669          // do not replay any flush commit yet
670          // hold on to the flush commit marker but simulate a larger
671          // flush commit seqId
672          commitFlushDesc =
673              FlushDescriptor.newBuilder(flushDesc)
674              .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
675              .build();
676        }
677        // after replay verify that everything is still visible
678        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
679      } else {
680        lastReplayed = replayEdit(secondaryRegion, entry);
681      }
682    }
683
684    // at this point, there should be some data (rows 0-100) in memstore snapshot
685    // and some more data in memstores (rows 100-200)
686    verifyData(secondaryRegion, 0, numRows, cq, families);
687
688    // no store files in the region
689    int expectedStoreFileCount = 0;
690    for (HStore s : secondaryRegion.getStores()) {
691      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
692    }
693    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
694
695    // Test case 1: replay the a flush commit marker larger than what we have prepared
696    LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
697        + startFlushDesc);
698    assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
699
700    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
701    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
702
703    // assert that the flush files are picked
704    expectedStoreFileCount++;
705    for (HStore s : secondaryRegion.getStores()) {
706      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
707    }
708    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
709    MemStoreSize mss = store.getFlushableSize();
710    assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
711
712    // assert that the region memstore is smaller than before, but not empty
713    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
714    assertTrue(newRegionMemstoreSize > 0);
715    assertTrue(regionMemstoreSize > newRegionMemstoreSize);
716
717    assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
718
719    LOG.info("-- Verifying edits from secondary");
720    verifyData(secondaryRegion, 0, numRows, cq, families);
721
722    LOG.info("-- Verifying edits from primary.");
723    verifyData(primaryRegion, 0, numRows, cq, families);
724  }
725
726  /**
727   * Tests the case where we receive a flush commit before receiving any flush prepare markers.
728   * The memstore edits should be dropped after the flush commit replay since they should be in
729   * flushed files
730   */
731  @Test
732  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
733      throws IOException {
734    testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
735  }
736
737  /**
738   * Tests the case where we receive a flush commit before receiving any flush prepare markers.
739   * The memstore edits should be not dropped after the flush commit replay since not every edit
740   * will be in flushed files (based on seqId)
741   */
742  @Test
743  public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
744      throws IOException {
745    testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
746  }
747
748  /**
749   * Tests the case where we receive a flush commit before receiving any flush prepare markers
750   */
751  public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
752      throws IOException {
753    // load some data to primary and flush. 1 flushes and some more unflushed data.
754    // write more data after flush depending on whether droppableSnapshot
755    putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
756    int numRows = droppableMemstore ? 100 : 200;
757
758    // now replay the edits and the flush marker
759    reader =  createWALReaderForPrimary();
760
761    LOG.info("-- Replaying edits and flush events in secondary");
762    FlushDescriptor commitFlushDesc = null;
763
764    int lastReplayed = 0;
765    while (true) {
766      WAL.Entry entry = reader.next();
767      if (entry == null) {
768        break;
769      }
770      FlushDescriptor flushDesc
771      = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
772      if (flushDesc != null) {
773        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
774          // do not replay flush start marker
775        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
776          commitFlushDesc = flushDesc; // hold on to the flush commit marker
777        }
778        // after replay verify that everything is still visible
779        verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
780      } else {
781        lastReplayed = replayEdit(secondaryRegion, entry);
782      }
783    }
784
785    // at this point, there should be some data (rows 0-200) in the memstore without snapshot
786    // and some more data in memstores (rows 100-300)
787    verifyData(secondaryRegion, 0, numRows, cq, families);
788
789    // no store files in the region
790    int expectedStoreFileCount = 0;
791    for (HStore s : secondaryRegion.getStores()) {
792      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
793    }
794    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
795
796    // Test case 1: replay a flush commit marker without start flush marker
797    assertNull(secondaryRegion.getPrepareFlushResult());
798    assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
799
800    // ensure all files are visible in secondary
801    for (HStore store : secondaryRegion.getStores()) {
802      assertTrue(store.getMaxSequenceId().orElse(0L) <= secondaryRegion.getReadPoint(null));
803    }
804
805    LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
806    secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
807
808    // assert that the flush files are picked
809    expectedStoreFileCount++;
810    for (HStore s : secondaryRegion.getStores()) {
811      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
812    }
813    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
814    MemStoreSize mss = store.getFlushableSize();
815    if (droppableMemstore) {
816      // assert that the memstore is dropped
817      assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
818    } else {
819      assertTrue(mss.getHeapSize() > 0); // assert that the memstore is not dropped
820    }
821
822    // assert that the region memstore is same as before (we could not drop)
823    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
824    if (droppableMemstore) {
825      assertTrue(0 == newRegionMemstoreSize);
826    } else {
827      assertTrue(regionMemstoreSize == newRegionMemstoreSize);
828    }
829
830    LOG.info("-- Verifying edits from secondary");
831    verifyData(secondaryRegion, 0, numRows, cq, families);
832
833    LOG.info("-- Verifying edits from primary.");
834    verifyData(primaryRegion, 0, numRows, cq, families);
835  }
836
837  private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
838    return FlushDescriptor.newBuilder(flush)
839        .setFlushSequenceNumber(flushSeqId)
840        .build();
841  }
842
843  /**
844   * Tests replaying region open markers from primary region. Checks whether the files are picked up
845   */
846  @Test
847  public void testReplayRegionOpenEvent() throws IOException {
848    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
849    int numRows = 100;
850
851    // close the region and open again.
852    primaryRegion.close();
853    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
854
855    // now replay the edits and the flush marker
856    reader =  createWALReaderForPrimary();
857    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
858
859    LOG.info("-- Replaying edits and region events in secondary");
860    while (true) {
861      WAL.Entry entry = reader.next();
862      if (entry == null) {
863        break;
864      }
865      FlushDescriptor flushDesc
866        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
867      RegionEventDescriptor regionEventDesc
868        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
869
870      if (flushDesc != null) {
871        // don't replay flush events
872      } else if (regionEventDesc != null) {
873        regionEvents.add(regionEventDesc);
874      } else {
875        // don't replay edits
876      }
877    }
878
879    // we should have 1 open, 1 close and 1 open event
880    assertEquals(3, regionEvents.size());
881
882    // replay the first region open event.
883    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
884
885    // replay the close event as well
886    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
887
888    // no store files in the region
889    int expectedStoreFileCount = 0;
890    for (HStore s : secondaryRegion.getStores()) {
891      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
892    }
893    long regionMemstoreSize = secondaryRegion.getMemStoreDataSize();
894    assertTrue(regionMemstoreSize == 0);
895
896    // now replay the region open event that should contain new file locations
897    LOG.info("Testing replaying region open event " + regionEvents.get(2));
898    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
899
900    // assert that the flush files are picked
901    expectedStoreFileCount++;
902    for (HStore s : secondaryRegion.getStores()) {
903      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
904    }
905    Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
906    MemStoreSize mss = store.getFlushableSize();
907    assertTrue(mss.getHeapSize() == MutableSegment.DEEP_OVERHEAD);
908
909    // assert that the region memstore is empty
910    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
911    assertTrue(newRegionMemstoreSize == 0);
912
913    assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
914
915    LOG.info("-- Verifying edits from secondary");
916    verifyData(secondaryRegion, 0, numRows, cq, families);
917
918    LOG.info("-- Verifying edits from primary.");
919    verifyData(primaryRegion, 0, numRows, cq, families);
920  }
921
922  /**
923   * Tests the case where we replay a region open event after a flush start but before receiving
924   * flush commit
925   */
926  @Test
927  public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
928    putDataWithFlushes(primaryRegion, 100, 100, 100);
929    int numRows = 200;
930
931    // close the region and open again.
932    primaryRegion.close();
933    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
934
935    // now replay the edits and the flush marker
936    reader =  createWALReaderForPrimary();
937    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
938
939    LOG.info("-- Replaying edits and region events in secondary");
940    while (true) {
941      WAL.Entry entry = reader.next();
942      if (entry == null) {
943        break;
944      }
945      FlushDescriptor flushDesc
946        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
947      RegionEventDescriptor regionEventDesc
948        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
949
950      if (flushDesc != null) {
951        // only replay flush start
952        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
953          secondaryRegion.replayWALFlushStartMarker(flushDesc);
954        }
955      } else if (regionEventDesc != null) {
956        regionEvents.add(regionEventDesc);
957      } else {
958        replayEdit(secondaryRegion, entry);
959      }
960    }
961
962    // at this point, there should be some data (rows 0-100) in the memstore snapshot
963    // and some more data in memstores (rows 100-200)
964    verifyData(secondaryRegion, 0, numRows, cq, families);
965
966    // we should have 1 open, 1 close and 1 open event
967    assertEquals(3, regionEvents.size());
968
969    // no store files in the region
970    int expectedStoreFileCount = 0;
971    for (HStore s : secondaryRegion.getStores()) {
972      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
973    }
974
975    // now replay the region open event that should contain new file locations
976    LOG.info("Testing replaying region open event " + regionEvents.get(2));
977    secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
978
979    // assert that the flush files are picked
980    expectedStoreFileCount = 2; // two flushes happened
981    for (HStore s : secondaryRegion.getStores()) {
982      assertEquals(expectedStoreFileCount, s.getStorefilesCount());
983    }
984    HStore store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
985    MemStoreSize newSnapshotSize = store.getSnapshotSize();
986    assertTrue(newSnapshotSize.getDataSize() == 0);
987
988    // assert that the region memstore is empty
989    long newRegionMemstoreSize = secondaryRegion.getMemStoreDataSize();
990    assertTrue(newRegionMemstoreSize == 0);
991
992    assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
993
994    LOG.info("-- Verifying edits from secondary");
995    verifyData(secondaryRegion, 0, numRows, cq, families);
996
997    LOG.info("-- Verifying edits from primary.");
998    verifyData(primaryRegion, 0, numRows, cq, families);
999  }
1000
1001  /**
1002   * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
1003   * of the last replayed region open event.
1004   */
1005  @Test
1006  public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
1007    putDataWithFlushes(primaryRegion, 100, 100, 0);
1008    int numRows = 100;
1009
1010    // close the region and open again.
1011    primaryRegion.close();
1012    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1013
1014    // now replay the edits and the flush marker
1015    reader =  createWALReaderForPrimary();
1016    List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
1017    List<WAL.Entry> edits = Lists.newArrayList();
1018
1019    LOG.info("-- Replaying edits and region events in secondary");
1020    while (true) {
1021      WAL.Entry entry = reader.next();
1022      if (entry == null) {
1023        break;
1024      }
1025      FlushDescriptor flushDesc
1026        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1027      RegionEventDescriptor regionEventDesc
1028        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1029
1030      if (flushDesc != null) {
1031        // don't replay flushes
1032      } else if (regionEventDesc != null) {
1033        regionEvents.add(regionEventDesc);
1034      } else {
1035        edits.add(entry);
1036      }
1037    }
1038
1039    // replay the region open of first open, but with the seqid of the second open
1040    // this way non of the flush files will be picked up.
1041    secondaryRegion.replayWALRegionEventMarker(
1042      RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1043        regionEvents.get(2).getLogSequenceNumber()).build());
1044
1045
1046    // replay edits from the before region close. If replay does not
1047    // skip these the following verification will NOT fail.
1048    for (WAL.Entry entry: edits) {
1049      replayEdit(secondaryRegion, entry);
1050    }
1051
1052    boolean expectedFail = false;
1053    try {
1054      verifyData(secondaryRegion, 0, numRows, cq, families);
1055    } catch (AssertionError e) {
1056      expectedFail = true; // expected
1057    }
1058    if (!expectedFail) {
1059      fail("Should have failed this verification");
1060    }
1061  }
1062
1063  @Test
1064  public void testReplayFlushSeqIds() throws IOException {
1065    // load some data to primary and flush
1066    int start = 0;
1067    LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
1068    putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1069    LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1070    primaryRegion.flush(true);
1071
1072    // now replay the flush marker
1073    reader =  createWALReaderForPrimary();
1074
1075    long flushSeqId = -1;
1076    LOG.info("-- Replaying flush events in secondary");
1077    while (true) {
1078      WAL.Entry entry = reader.next();
1079      if (entry == null) {
1080        break;
1081      }
1082      FlushDescriptor flushDesc
1083        = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1084      if (flushDesc != null) {
1085        if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1086          LOG.info("-- Replaying flush start in secondary");
1087          secondaryRegion.replayWALFlushStartMarker(flushDesc);
1088          flushSeqId = flushDesc.getFlushSequenceNumber();
1089        } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1090          LOG.info("-- Replaying flush commit in secondary");
1091          secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1092          assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1093        }
1094      }
1095      // else do not replay
1096    }
1097
1098    // TODO: what to do with this?
1099    // assert that the newly picked up flush file is visible
1100    long readPoint = secondaryRegion.getMVCC().getReadPoint();
1101    assertEquals(flushSeqId, readPoint);
1102
1103    // after replay verify that everything is still visible
1104    verifyData(secondaryRegion, 0, 100, cq, families);
1105  }
1106
1107  @Test
1108  public void testSeqIdsFromReplay() throws IOException {
1109    // test the case where seqId's coming from replayed WALEdits are made persisted with their
1110    // original seqIds and they are made visible through mvcc read point upon replay
1111    String method = name.getMethodName();
1112    byte[] tableName = Bytes.toBytes(method);
1113    byte[] family = Bytes.toBytes("family");
1114
1115    HRegion region = initHRegion(tableName, method, family);
1116    try {
1117      // replay an entry that is bigger than current read point
1118      long readPoint = region.getMVCC().getReadPoint();
1119      long origSeqId = readPoint + 100;
1120
1121      Put put = new Put(row).addColumn(family, row, row);
1122      put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1123      replay(region, put, origSeqId);
1124
1125      // read point should have advanced to this seqId
1126      assertGet(region, family, row);
1127
1128      // region seqId should have advanced at least to this seqId
1129      assertEquals(origSeqId, region.getReadPoint(null));
1130
1131      // replay an entry that is smaller than current read point
1132      // caution: adding an entry below current read point might cause partial dirty reads. Normal
1133      // replay does not allow reads while replay is going on.
1134      put = new Put(row2).addColumn(family, row2, row2);
1135      put.setDurability(Durability.SKIP_WAL);
1136      replay(region, put, origSeqId - 50);
1137
1138      assertGet(region, family, row2);
1139    } finally {
1140      region.close();
1141    }
1142  }
1143
1144  /**
1145   * Tests that a region opened in secondary mode would not write region open / close
1146   * events to its WAL.
1147   * @throws IOException
1148   */
1149  @Test
1150  public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1151    secondaryRegion.close();
1152    walSecondary = spy(walSecondary);
1153
1154    // test for region open and close
1155    secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1156    verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class),
1157      any(WALEdit.class), anyBoolean());
1158
1159    // test for replay prepare flush
1160    putDataByReplay(secondaryRegion, 0, 10, cq, families);
1161    secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1162      setFlushSequenceNumber(10)
1163      .setTableName(UnsafeByteOperations.unsafeWrap(
1164          primaryRegion.getTableDescriptor().getTableName().getName()))
1165      .setAction(FlushAction.START_FLUSH)
1166      .setEncodedRegionName(
1167          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1168      .setRegionName(UnsafeByteOperations.unsafeWrap(
1169          primaryRegion.getRegionInfo().getRegionName()))
1170      .build());
1171
1172    verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class),
1173      any(WALEdit.class), anyBoolean());
1174
1175    secondaryRegion.close();
1176    verify(walSecondary, times(0)).append(any(RegionInfo.class), any(WALKeyImpl.class),
1177      any(WALEdit.class), anyBoolean());
1178  }
1179
1180  /**
1181   * Tests the reads enabled flag for the region. When unset all reads should be rejected
1182   */
1183  @Test
1184  public void testRegionReadsEnabledFlag() throws IOException {
1185
1186    putDataByReplay(secondaryRegion, 0, 100, cq, families);
1187
1188    verifyData(secondaryRegion, 0, 100, cq, families);
1189
1190    // now disable reads
1191    secondaryRegion.setReadsEnabled(false);
1192    try {
1193      verifyData(secondaryRegion, 0, 100, cq, families);
1194      fail("Should have failed with IOException");
1195    } catch(IOException ex) {
1196      // expected
1197    }
1198
1199    // verify that we can still replay data
1200    putDataByReplay(secondaryRegion, 100, 100, cq, families);
1201
1202    // now enable reads again
1203    secondaryRegion.setReadsEnabled(true);
1204    verifyData(secondaryRegion, 0, 200, cq, families);
1205  }
1206
1207  /**
1208   * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1209   * It should write the flush request marker instead.
1210   */
1211  @Test
1212  public void testWriteFlushRequestMarker() throws IOException {
1213    // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1214    FlushResultImpl result = primaryRegion.flushcache(true, false, FlushLifeCycleTracker.DUMMY);
1215    assertNotNull(result);
1216    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1217    assertFalse(result.wroteFlushWalMarker);
1218
1219    // request flush again, but this time with writeFlushRequestWalMarker = true
1220    result = primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1221    assertNotNull(result);
1222    assertEquals(FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.result);
1223    assertTrue(result.wroteFlushWalMarker);
1224
1225    List<FlushDescriptor> flushes = Lists.newArrayList();
1226    reader = createWALReaderForPrimary();
1227    while (true) {
1228      WAL.Entry entry = reader.next();
1229      if (entry == null) {
1230        break;
1231      }
1232      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1233      if (flush != null) {
1234        flushes.add(flush);
1235      }
1236    }
1237
1238    assertEquals(1, flushes.size());
1239    assertNotNull(flushes.get(0));
1240    assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1241  }
1242
1243  /**
1244   * Test the case where the secondary region replica is not in reads enabled state because it is
1245   * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1246   * flush marker entry should restore the reads enabled status in the region and allow the reads
1247   * to continue.
1248   */
1249  @Test
1250  public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1251    disableReads(secondaryRegion);
1252
1253    // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1254    // triggered flush restores readsEnabled
1255    primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
1256    reader = createWALReaderForPrimary();
1257    while (true) {
1258      WAL.Entry entry = reader.next();
1259      if (entry == null) {
1260        break;
1261      }
1262      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1263      if (flush != null) {
1264        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1265      }
1266    }
1267
1268    // now reads should be enabled
1269    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1270  }
1271
1272  /**
1273   * Test the case where the secondary region replica is not in reads enabled state because it is
1274   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1275   * entries should restore the reads enabled status in the region and allow the reads
1276   * to continue.
1277   */
1278  @Test
1279  public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1280    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1281    // from triggered flush restores readsEnabled
1282    disableReads(secondaryRegion);
1283
1284    // put some data in primary
1285    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1286    primaryRegion.flush(true);
1287    // I seem to need to push more edits through so the WAL flushes on local fs. This was not
1288    // needed before HBASE-15028. Not sure whats up. I can see that we have not flushed if I
1289    // look at the WAL if I pause the test here and then use WALPrettyPrinter to look at content..
1290    // Doing same check before HBASE-15028 I can see all edits flushed to the WAL. Somethings up
1291    // but can't figure it... and this is only test that seems to suffer this flush issue.
1292    // St.Ack 20160201
1293    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1294
1295    reader = createWALReaderForPrimary();
1296    while (true) {
1297      WAL.Entry entry = reader.next();
1298      LOG.info(Objects.toString(entry));
1299      if (entry == null) {
1300        break;
1301      }
1302      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1303      if (flush != null) {
1304        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1305      } else {
1306        replayEdit(secondaryRegion, entry);
1307      }
1308    }
1309
1310    // now reads should be enabled
1311    verifyData(secondaryRegion, 0, 100, cq, families);
1312  }
1313
1314  /**
1315   * Test the case where the secondary region replica is not in reads enabled state because it is
1316   * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1317   * entries should restore the reads enabled status in the region and allow the reads
1318   * to continue.
1319   */
1320  @Test
1321  public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1322    // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1323    // from triggered flush restores readsEnabled
1324    disableReads(secondaryRegion);
1325
1326    // put some data in primary
1327    putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1328    primaryRegion.flush(true);
1329
1330    reader = createWALReaderForPrimary();
1331    while (true) {
1332      WAL.Entry entry = reader.next();
1333      if (entry == null) {
1334        break;
1335      }
1336      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1337      if (flush != null) {
1338        secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
1339      }
1340    }
1341
1342    // now reads should be enabled
1343    verifyData(secondaryRegion, 0, 100, cq, families);
1344  }
1345
1346  /**
1347   * Test the case where the secondary region replica is not in reads enabled state because it is
1348   * waiting for a flush or region open marker from primary region. Replaying region open event
1349   * entry from primary should restore the reads enabled status in the region and allow the reads
1350   * to continue.
1351   */
1352  @Test
1353  public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1354    // Test case 3: Test that replaying region open event markers restores readsEnabled
1355    disableReads(secondaryRegion);
1356
1357    primaryRegion.close();
1358    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1359
1360    reader = createWALReaderForPrimary();
1361    while (true) {
1362      WAL.Entry entry = reader.next();
1363      if (entry == null) {
1364        break;
1365      }
1366
1367      RegionEventDescriptor regionEventDesc
1368        = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1369
1370      if (regionEventDesc != null) {
1371        secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1372      }
1373    }
1374
1375    // now reads should be enabled
1376    secondaryRegion.get(new Get(Bytes.toBytes(0)));
1377  }
1378
1379  @Test
1380  public void testRefresStoreFiles() throws IOException {
1381    assertEquals(0, primaryRegion.getStoreFileList(families).size());
1382    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1383
1384    // Test case 1: refresh with an empty region
1385    secondaryRegion.refreshStoreFiles();
1386    assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1387
1388    // do one flush
1389    putDataWithFlushes(primaryRegion, 100, 100, 0);
1390    int numRows = 100;
1391
1392    // refresh the store file list, and ensure that the files are picked up.
1393    secondaryRegion.refreshStoreFiles();
1394    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1395      secondaryRegion.getStoreFileList(families));
1396    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1397
1398    LOG.info("-- Verifying edits from secondary");
1399    verifyData(secondaryRegion, 0, numRows, cq, families);
1400
1401    // Test case 2: 3 some more flushes
1402    putDataWithFlushes(primaryRegion, 100, 300, 0);
1403    numRows = 300;
1404
1405    // refresh the store file list, and ensure that the files are picked up.
1406    secondaryRegion.refreshStoreFiles();
1407    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1408      secondaryRegion.getStoreFileList(families));
1409    assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1410
1411    LOG.info("-- Verifying edits from secondary");
1412    verifyData(secondaryRegion, 0, numRows, cq, families);
1413
1414    if (FSUtils.WINDOWS) {
1415      // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1416      return;
1417    }
1418
1419    // Test case 3: compact primary files
1420    primaryRegion.compactStores();
1421    List<HRegion> regions = new ArrayList<>();
1422    regions.add(primaryRegion);
1423    Mockito.doReturn(regions).when(rss).getRegions();
1424    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
1425    cleaner.chore();
1426    secondaryRegion.refreshStoreFiles();
1427    assertPathListsEqual(primaryRegion.getStoreFileList(families),
1428      secondaryRegion.getStoreFileList(families));
1429    assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1430
1431    LOG.info("-- Verifying edits from secondary");
1432    verifyData(secondaryRegion, 0, numRows, cq, families);
1433
1434    LOG.info("-- Replaying edits in secondary");
1435
1436    // Test case 4: replay some edits, ensure that memstore is dropped.
1437    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1438    putDataWithFlushes(primaryRegion, 400, 400, 0);
1439    numRows = 400;
1440
1441    reader =  createWALReaderForPrimary();
1442    while (true) {
1443      WAL.Entry entry = reader.next();
1444      if (entry == null) {
1445        break;
1446      }
1447      FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1448      if (flush != null) {
1449        // do not replay flush
1450      } else {
1451        replayEdit(secondaryRegion, entry);
1452      }
1453    }
1454
1455    assertTrue(secondaryRegion.getMemStoreDataSize() > 0);
1456
1457    secondaryRegion.refreshStoreFiles();
1458
1459    assertTrue(secondaryRegion.getMemStoreDataSize() == 0);
1460
1461    LOG.info("-- Verifying edits from primary");
1462    verifyData(primaryRegion, 0, numRows, cq, families);
1463    LOG.info("-- Verifying edits from secondary");
1464    verifyData(secondaryRegion, 0, numRows, cq, families);
1465  }
1466
1467  /**
1468   * Paths can be qualified or not. This does the assertion using String->Path conversion.
1469   */
1470  private void assertPathListsEqual(List<String> list1, List<String> list2) {
1471    List<Path> l1 = new ArrayList<>(list1.size());
1472    for (String path : list1) {
1473      l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1474    }
1475    List<Path> l2 = new ArrayList<>(list2.size());
1476    for (String path : list2) {
1477      l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1478    }
1479    assertEquals(l1, l2);
1480  }
1481
1482  private void disableReads(HRegion region) {
1483    region.setReadsEnabled(false);
1484    try {
1485      verifyData(region, 0, 1, cq, families);
1486      fail("Should have failed with IOException");
1487    } catch(IOException ex) {
1488      // expected
1489    }
1490  }
1491
1492  private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1493    put.setDurability(Durability.SKIP_WAL);
1494    MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1495    region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1496  }
1497
1498  /**
1499   * Tests replaying region open markers from primary region. Checks whether the files are picked up
1500   */
1501  @Test
1502  public void testReplayBulkLoadEvent() throws IOException {
1503    LOG.info("testReplayBulkLoadEvent starts");
1504    putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1505
1506    // close the region and open again.
1507    primaryRegion.close();
1508    primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1509
1510    // bulk load a file into primary region
1511    Random random = new Random();
1512    byte[] randomValues = new byte[20];
1513    random.nextBytes(randomValues);
1514    Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1515
1516    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
1517    int expectedLoadFileCount = 0;
1518    for (byte[] family : families) {
1519      familyPaths.add(new Pair<>(family, createHFileForFamilies(testPath, family, randomValues)));
1520      expectedLoadFileCount++;
1521    }
1522    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1523
1524    // now replay the edits and the bulk load marker
1525    reader = createWALReaderForPrimary();
1526
1527    LOG.info("-- Replaying edits and region events in secondary");
1528    BulkLoadDescriptor bulkloadEvent = null;
1529    while (true) {
1530      WAL.Entry entry = reader.next();
1531      if (entry == null) {
1532        break;
1533      }
1534      bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1535      if (bulkloadEvent != null) {
1536        break;
1537      }
1538    }
1539
1540    // we should have 1 bulk load event
1541    assertTrue(bulkloadEvent != null);
1542    assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1543
1544    // replay the bulk load event
1545    secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1546
1547
1548    List<String> storeFileName = new ArrayList<>();
1549    for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1550      storeFileName.addAll(storeDesc.getStoreFileList());
1551    }
1552    // assert that the bulk loaded files are picked
1553    for (HStore s : secondaryRegion.getStores()) {
1554      for (HStoreFile sf : s.getStorefiles()) {
1555        storeFileName.remove(sf.getPath().getName());
1556      }
1557    }
1558    assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1559
1560    LOG.info("-- Verifying edits from secondary");
1561    for (byte[] family : families) {
1562      assertGet(secondaryRegion, family, randomValues);
1563    }
1564  }
1565
1566  @Test
1567  public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1568    // tests replaying flush commit marker, but the flush file has already been compacted
1569    // from primary and also deleted from the archive directory
1570    secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1571      setFlushSequenceNumber(Long.MAX_VALUE)
1572      .setTableName(UnsafeByteOperations.unsafeWrap(primaryRegion.getTableDescriptor().getTableName().getName()))
1573      .setAction(FlushAction.COMMIT_FLUSH)
1574      .setEncodedRegionName(
1575          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1576      .setRegionName(UnsafeByteOperations.unsafeWrap(
1577          primaryRegion.getRegionInfo().getRegionName()))
1578      .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1579        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1580        .setStoreHomeDir("/store_home_dir")
1581        .addFlushOutput("/foo/baz/bar")
1582        .build())
1583      .build());
1584  }
1585
1586  @Test
1587  public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1588    // tests replaying compaction marker, but the compaction output file has already been compacted
1589    // from primary and also deleted from the archive directory
1590    secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1591      .setTableName(UnsafeByteOperations.unsafeWrap(
1592          primaryRegion.getTableDescriptor().getTableName().getName()))
1593      .setEncodedRegionName(
1594          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1595      .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1596      .addCompactionInput("/foo")
1597      .addCompactionOutput("/bar")
1598      .setStoreHomeDir("/store_home_dir")
1599      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1600      .build()
1601      , true, true, Long.MAX_VALUE);
1602  }
1603
1604  @Test
1605  public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1606    // tests replaying region open event marker, but the region files have already been compacted
1607    // from primary and also deleted from the archive directory
1608    secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1609      .setTableName(UnsafeByteOperations.unsafeWrap(
1610          primaryRegion.getTableDescriptor().getTableName().getName()))
1611      .setEncodedRegionName(
1612          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1613      .setRegionName(UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getRegionName()))
1614      .setEventType(EventType.REGION_OPEN)
1615      .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1616      .setLogSequenceNumber(Long.MAX_VALUE)
1617      .addStores(StoreDescriptor.newBuilder()
1618        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1619        .setStoreHomeDir("/store_home_dir")
1620        .addStoreFile("/foo")
1621        .build())
1622      .build());
1623  }
1624
1625  @Test
1626  public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1627    // tests replaying bulk load event marker, but the bulk load files have already been compacted
1628    // from primary and also deleted from the archive directory
1629    secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1630      .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDescriptor().getTableName()))
1631      .setEncodedRegionName(
1632          UnsafeByteOperations.unsafeWrap(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1633      .setBulkloadSeqNum(Long.MAX_VALUE)
1634      .addStores(StoreDescriptor.newBuilder()
1635        .setFamilyName(UnsafeByteOperations.unsafeWrap(families[0]))
1636        .setStoreHomeDir("/store_home_dir")
1637        .addStoreFile("/foo")
1638        .build())
1639      .build());
1640  }
1641
1642  private String createHFileForFamilies(Path testPath, byte[] family,
1643      byte[] valueBytes) throws IOException {
1644    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1645    // TODO We need a way to do this without creating files
1646    Path testFile = new Path(testPath, UUID.randomUUID().toString());
1647    FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1648    try {
1649      hFileFactory.withOutputStream(out);
1650      hFileFactory.withFileContext(new HFileContext());
1651      HFile.Writer writer = hFileFactory.create();
1652      try {
1653        writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0L,
1654          KeyValue.Type.Put.getCode(), valueBytes)));
1655      } finally {
1656        writer.close();
1657      }
1658    } finally {
1659      out.close();
1660    }
1661    return testFile.toString();
1662  }
1663
1664  /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1665   * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1666   * more rows but does not execute flush after
1667   * @throws IOException */
1668  private void putDataWithFlushes(HRegion region, int flushInterval,
1669      int numRows, int numRowsAfterFlush) throws IOException {
1670    int start = 0;
1671    for (; start < numRows; start += flushInterval) {
1672      LOG.info("-- Writing some data to primary from " +  start + " to " + (start+flushInterval));
1673      putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1674      LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1675      region.flush(true);
1676    }
1677    LOG.info("-- Writing some more data to primary, not flushing");
1678    putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1679  }
1680
1681  private void putDataByReplay(HRegion region,
1682      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1683    for (int i = startRow; i < startRow + numRows; i++) {
1684      Put put = new Put(Bytes.toBytes("" + i));
1685      put.setDurability(Durability.SKIP_WAL);
1686      for (byte[] family : families) {
1687        put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
1688      }
1689      replay(region, put, i+1);
1690    }
1691  }
1692
1693  private static HRegion initHRegion(byte[] tableName,
1694      String callingMethod, byte[]... families) throws IOException {
1695    return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1696      callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1697  }
1698
1699  private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1700      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1701      WAL wal, byte[]... families) throws IOException {
1702    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1703      isReadOnly, durability, wal, families);
1704  }
1705}