001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ThreadPoolExecutor;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.MemoryCompactionPolicy;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.RegionInfoBuilder;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.RegionServerTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Threads;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.junit.jupiter.api.BeforeEach;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047import org.mockito.Mockito;
048
049/**
050 * This test verifies the correctness of the Per Column Family flushing strategy when part of the
051 * memstores are compacted memstores
052 */
053@Tag(RegionServerTests.TAG)
054@Tag(LargeTests.TAG)
055public class TestWalAndCompactingMemStoreFlush {
056
057  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
058  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
059  public static final TableName TABLENAME =
060    TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1");
061
062  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
063    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
064
065  public static final byte[] FAMILY1 = FAMILIES[0];
066  public static final byte[] FAMILY2 = FAMILIES[1];
067  public static final byte[] FAMILY3 = FAMILIES[2];
068
069  private Configuration conf;
070
071  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
072    int i = 0;
073    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
074    for (byte[] family : FAMILIES) {
075      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
076      // even column families are going to have compacted memstore
077      if (i % 2 == 0) {
078        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy
079          .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
080      } else {
081        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
082      }
083      builder.setColumnFamily(cfBuilder.build());
084      i++;
085    }
086
087    RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
088    Path path = new Path(DIR, callingMethod);
089    HRegion region = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build(), false);
090    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
091    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
092    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
093    region.initialize(null);
094    return region;
095  }
096
097  // A helper function to create puts.
098  private Put createPut(int familyNum, int putNum) {
099    byte[] qf = Bytes.toBytes("q" + familyNum);
100    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
101    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
102    Put p = new Put(row);
103    p.addColumn(FAMILIES[familyNum - 1], qf, val);
104    return p;
105  }
106
107  // A helper function to create double puts, so something can be compacted later.
108  private Put createDoublePut(int familyNum, int putNum) {
109    byte[] qf = Bytes.toBytes("q" + familyNum);
110    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
111    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
112    Put p = new Put(row);
113    // add twice with different timestamps
114    p.addColumn(FAMILIES[familyNum - 1], qf, 10, val);
115    p.addColumn(FAMILIES[familyNum - 1], qf, 20, val);
116    return p;
117  }
118
119  private void verifyInMemoryFlushSize(Region region) {
120    assertEquals(
121      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
122      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
123  }
124
125  @BeforeEach
126  public void setup() {
127    conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
128    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
129      FlushNonSloppyStoresFirstPolicy.class.getName());
130    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
131  }
132
133  @Test
134  public void testSelectiveFlushWithEager() throws IOException {
135    // Set up the configuration
136    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
137    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
138    // set memstore to do data compaction
139    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
140      String.valueOf(MemoryCompactionPolicy.EAGER));
141
142    // Intialize the region
143    HRegion region = initHRegion("testSelectiveFlushWithEager", conf);
144    verifyInMemoryFlushSize(region);
145    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
146    for (int i = 1; i <= 1200; i++) {
147      region.put(createPut(1, i)); // compacted memstore, all the keys are unique
148
149      if (i <= 100) {
150        region.put(createPut(2, i));
151        if (i <= 50) {
152          // compacted memstore, subject for compaction due to duplications
153          region.put(createDoublePut(3, i));
154        }
155      }
156    }
157
158    // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk
159    for (int i = 100; i < 2000; i++) {
160      region.put(createPut(2, i));
161    }
162
163    long totalMemstoreSize = region.getMemStoreDataSize();
164
165    // Find the smallest LSNs for edits wrt to each CF.
166    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
167    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
168    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
169
170    // Find the sizes of the memstores of each CF.
171    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
172    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
173    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
174
175    // Get the overall smallest LSN in the region's memstores.
176    long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL
177      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
178
179    String s = "\n\n----------------------------------\n"
180      + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI
181      + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore()
182      + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
183      + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI
184      + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n";
185
186    // The overall smallest LSN in the region's memstores should be the same as
187    // the LSN of the smallest edit in CF1
188    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
189
190    // Some other sanity checks.
191    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
192    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
193    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
194    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
195    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
196
197    // The total memstore size should be the same as the sum of the sizes of
198    // memstores of CF1, CF2 and CF3.
199    String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI="
200      + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI
201      + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
202    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
203      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize(), msg);
204
205    // Flush!!!!!!!!!!!!!!!!!!!!!!
206    // We have big compacting memstore CF1 and two small memstores:
207    // CF2 (not compacted) and CF3 (compacting)
208    // All together they are above the flush size lower bound.
209    // Since CF1 and CF3 should be flushed to memory (not to disk),
210    // CF2 is going to be flushed to disk.
211    // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted
212    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
213    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
214    cms1.flushInMemory();
215    cms3.flushInMemory();
216    region.flush(false);
217
218    // Recalculate everything
219    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
220    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
221    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
222
223    long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
224      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
225    // Find the smallest LSNs for edits wrt to each CF.
226    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
227    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
228    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
229
230    s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
231      + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
232      + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
233
234    // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened
235    assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize());
236    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
237
238    // CF2 should become empty
239    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
240    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
241
242    // verify that CF3 was flushed to memory and was compacted (this is approximation check)
243    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
244    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize());
245
246    // Now the smallest LSN in the region should be the same as the smallest
247    // LSN in the memstore of CF1.
248    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
249
250    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
251    // memory in next flush
252    for (int i = 1200; i < 3000; i++) {
253      region.put(createPut(1, i));
254    }
255
256    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
257      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", "
258      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:"
259      + smallestSeqCF3PhaseII + "\n";
260
261    // How much does the CF1 memstore occupy? Will be used later.
262    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
263    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
264
265    s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
266      + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n";
267
268    // Flush!!!!!!!!!!!!!!!!!!!!!!
269    // Flush again, CF1 is flushed to disk
270    // CF2 is flushed to disk, because it is not in-memory compacted memstore
271    // CF3 is flushed empty to memory (actually nothing happens to CF3)
272    region.flush(false);
273
274    // Recalculate everything
275    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
276    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
277    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
278
279    long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
280      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
281    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
282    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
283    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
284
285    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
286      + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n";
287
288    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
289      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
290      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
291      + smallestSeqCF3PhaseIV + "\n";
292
293    // CF1's pipeline component (inserted before first flush) should be flushed to disk
294    // CF2 should be flushed to disk
295    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
296    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
297    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
298
299    // CF3 shouldn't have been touched.
300    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
301
302    // the smallest LSN of CF3 shouldn't change
303    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
304
305    // CF3 should be bottleneck for WAL
306    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV, s);
307
308    // Flush!!!!!!!!!!!!!!!!!!!!!!
309    // Trying to clean the existing memstores, CF2 all flushed to disk. The single
310    // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk.
311    region.flush(true);
312
313    // Recalculate everything
314    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
315    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
316    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
317    long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL
318      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
319
320    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
321    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
322    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
323    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
324    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
325    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
326
327    // What happens when we hit the memstore limit, but we are not able to find
328    // any Column Family above the threshold?
329    // In that case, we should flush all the CFs.
330
331    // The memstore limit is 100*1024 and the column family flush threshold is
332    // around 25*1024. We try to just hit the memstore limit with each CF's
333    // memstore being below the CF flush threshold.
334    for (int i = 1; i <= 300; i++) {
335      region.put(createPut(1, i));
336      region.put(createPut(2, i));
337      region.put(createPut(3, i));
338      region.put(createPut(4, i));
339      region.put(createPut(5, i));
340    }
341
342    region.flush(false);
343
344    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
345      + smallestSeqInRegionCurrentMemstorePhaseV
346      + ". After additional inserts and last flush, the entire region size is:"
347      + region.getMemStoreDataSize() + "\n----------------------------------\n";
348
349    // Since we won't find any CF above the threshold, and hence no specific
350    // store to flush, we should flush all the memstores
351    // Also compacted memstores are flushed to disk.
352    assertEquals(0, region.getMemStoreDataSize());
353    System.out.println(s);
354    HBaseTestingUtil.closeRegionAndWAL(region);
355  }
356
357  /*------------------------------------------------------------------------------*/
358  /* Check the same as above but for index-compaction type of compacting memstore */
359  @Test
360  public void testSelectiveFlushWithIndexCompaction() throws IOException {
361    /*------------------------------------------------------------------------------*/
362    /* SETUP */
363    // Set up the configuration
364    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
365    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
366    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
367    // set memstore to index-compaction
368    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
369      String.valueOf(MemoryCompactionPolicy.BASIC));
370
371    // Initialize the region
372    HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
373    verifyInMemoryFlushSize(region);
374    /*------------------------------------------------------------------------------*/
375    /* PHASE I - insertions */
376    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
377    for (int i = 1; i <= 1200; i++) {
378      region.put(createPut(1, i)); // compacted memstore
379      if (i <= 100) {
380        region.put(createPut(2, i));
381        if (i <= 50) {
382          region.put(createDoublePut(3, i)); // subject for in-memory compaction
383        }
384      }
385    }
386    // Now add more puts for CF2, so that we only flush CF2 to disk
387    for (int i = 100; i < 2000; i++) {
388      region.put(createPut(2, i));
389    }
390
391    /*------------------------------------------------------------------------------*/
392    /*------------------------------------------------------------------------------*/
393    /* PHASE I - collect sizes */
394    long totalMemstoreSizePhaseI = region.getMemStoreDataSize();
395    // Find the smallest LSNs for edits wrt to each CF.
396    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
397    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
398    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
399    // Find the sizes of the memstores of each CF.
400    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
401    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
402    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
403    // Get the overall smallest LSN in the region's memstores.
404    long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL
405      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
406
407    /*------------------------------------------------------------------------------*/
408    /* PHASE I - validation */
409    // The overall smallest LSN in the region's memstores should be the same as
410    // the LSN of the smallest edit in CF1
411    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
412    // Some other sanity checks.
413    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
414    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
415    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
416    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
417    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
418
419    // The total memstore size should be the same as the sum of the sizes of
420    // memstores of CF1, CF2 and CF3.
421    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
422      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
423
424    /*------------------------------------------------------------------------------*/
425    /* PHASE I - Flush */
426    // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
427    // CF1, CF2, CF3, all together they are above the flush size lower bound.
428    // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
429    // CF1 and CF3 - flushed to memory and flatten explicitly
430    region.flush(false);
431    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
432    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
433    cms1.flushInMemory();
434    cms3.flushInMemory();
435
436    // CF3/CF1 should be merged so wait here to be sure the compaction is done
437    while (
438      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
439        .isMemStoreFlushingInMemory()
440    ) {
441      Threads.sleep(10);
442    }
443    while (
444      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
445        .isMemStoreFlushingInMemory()
446    ) {
447      Threads.sleep(10);
448    }
449
450    /*------------------------------------------------------------------------------*/
451    /*------------------------------------------------------------------------------*/
452    /* PHASE II - collect sizes */
453    // Recalculate everything
454    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
455    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
456    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
457    long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
458      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
459    // Find the smallest LSNs for edits wrt to each CF.
460    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
461    long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
462
463    /*------------------------------------------------------------------------------*/
464    /* PHASE II - validation */
465    // CF1 was flushed to memory, should be flattened and take less space
466    assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize());
467    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
468    // CF2 should become empty
469    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
470    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
471    // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
472    // if compacted CF# should be at least twice less because its every key was duplicated
473    assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize());
474    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize());
475
476    // Now the smallest LSN in the region should be the same as the smallest
477    // LSN in the memstore of CF1.
478    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
479    // The total memstore size should be the same as the sum of the sizes of
480    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
481    // items in CF1/2
482    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
483      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
484
485    /*------------------------------------------------------------------------------*/
486    /*------------------------------------------------------------------------------*/
487    /* PHASE III - insertions */
488    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
489    // memory in next flush. This is causing the CF! to be flushed to memory twice.
490    for (int i = 1200; i < 8000; i++) {
491      region.put(createPut(1, i));
492    }
493
494    // CF1 should be flatten and merged so wait here to be sure the compaction is done
495    while (
496      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
497        .isMemStoreFlushingInMemory()
498    ) {
499      Threads.sleep(10);
500    }
501
502    /*------------------------------------------------------------------------------*/
503    /* PHASE III - collect sizes */
504    // How much does the CF1 memstore occupy now? Will be used later.
505    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
506    long totalMemstoreSizePhaseIII = region.getMemStoreDataSize();
507
508    /*------------------------------------------------------------------------------*/
509    /* PHASE III - validation */
510    // The total memstore size should be the same as the sum of the sizes of
511    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
512    // items in CF1/2
513    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
514      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
515
516    /*------------------------------------------------------------------------------*/
517    /* PHASE III - Flush */
518    // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
519    // CF1 is flushed to disk, but not entirely emptied.
520    // CF2 was and remained empty, same way nothing happens to CF3
521    region.flush(false);
522
523    /*------------------------------------------------------------------------------*/
524    /*------------------------------------------------------------------------------*/
525    /* PHASE IV - collect sizes */
526    // Recalculate everything
527    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
528    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
529    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
530    long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
531      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
532    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
533
534    /*------------------------------------------------------------------------------*/
535    /* PHASE IV - validation */
536    // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
537    // CF2 should remain empty
538    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
539    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
540    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
541    // CF3 shouldn't have been touched.
542    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
543    // the smallest LSN of CF3 shouldn't change
544    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
545    // CF3 should be bottleneck for WAL
546    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
547
548    /*------------------------------------------------------------------------------*/
549    /* PHASE IV - Flush */
550    // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
551    // Force flush to disk on all memstores (flush parameter true).
552    // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty
553    region.flush(true);
554
555    /*------------------------------------------------------------------------------*/
556    /*------------------------------------------------------------------------------*/
557    /* PHASE V - collect sizes */
558    // Recalculate everything
559    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
560    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
561    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
562    long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL
563      .getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
564    long totalMemstoreSizePhaseV = region.getMemStoreDataSize();
565
566    /*------------------------------------------------------------------------------*/
567    /* PHASE V - validation */
568    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
569    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
570    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
571    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
572    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
573    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
574    // The total memstores size should be empty
575    assertEquals(0, totalMemstoreSizePhaseV);
576    // Because there is nothing in any memstore the WAL's LSN should be -1
577    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV);
578
579    // What happens when we hit the memstore limit, but we are not able to find
580    // any Column Family above the threshold?
581    // In that case, we should flush all the CFs.
582
583    /*------------------------------------------------------------------------------*/
584    /*------------------------------------------------------------------------------*/
585    /* PHASE VI - insertions */
586    // The memstore limit is 200*1024 and the column family flush threshold is
587    // around 50*1024. We try to just hit the memstore limit with each CF's
588    // memstore being below the CF flush threshold.
589    for (int i = 1; i <= 300; i++) {
590      region.put(createPut(1, i));
591      region.put(createPut(2, i));
592      region.put(createPut(3, i));
593      region.put(createPut(4, i));
594      region.put(createPut(5, i));
595    }
596
597    MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
598    MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
599    MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
600
601    /*------------------------------------------------------------------------------*/
602    /* PHASE VI - Flush */
603    // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
604    // None among compacting memstores was flushed to memory due to previous puts.
605    // But is going to be moved to pipeline and flatten due to the flush.
606    region.flush(false);
607    // Since we won't find any CF above the threshold, and hence no specific
608    // store to flush, we should flush all the memstores
609    // Also compacted memstores are flushed to disk, but not entirely emptied
610    MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
611    MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
612    MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
613
614    assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize());
615    assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize());
616    assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize());
617
618    HBaseTestingUtil.closeRegionAndWAL(region);
619  }
620
621  @Test
622  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
623    // Set up the configuration
624    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
625    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
626    // set memstore to do data compaction and not to use the speculative scan
627    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
628      String.valueOf(MemoryCompactionPolicy.EAGER));
629
630    // Intialize the HRegion
631    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
632    verifyInMemoryFlushSize(region);
633    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
634    for (int i = 1; i <= 1200; i++) {
635      region.put(createPut(1, i));
636      if (i <= 100) {
637        region.put(createPut(2, i));
638        if (i <= 50) {
639          region.put(createPut(3, i));
640        }
641      }
642    }
643    // Now add more puts for CF2, so that we only flush CF2 to disk
644    for (int i = 100; i < 2000; i++) {
645      region.put(createPut(2, i));
646    }
647
648    // in this test check the non-composite snapshot - flashing only tail of the pipeline
649    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
650    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
651
652    long totalMemstoreSize = region.getMemStoreDataSize();
653
654    // Find the sizes of the memstores of each CF.
655    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
656    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
657    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
658
659    // Some other sanity checks.
660    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
661    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
662    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
663
664    // The total memstore size should be the same as the sum of the sizes of
665    // memstores of CF1, CF2 and CF3.
666    String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="
667      + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI
668      + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="
669      + cf3MemstoreSizePhaseI;
670    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
671      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize(), msg);
672
673    // Flush!
674    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
675    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
676    cms1.flushInMemory();
677    cms3.flushInMemory();
678    region.flush(false);
679
680    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
681
682    long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
683      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
684    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
685    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
686    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
687
688    // CF2 should have been cleared
689    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
690    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
691
692    String s = "\n\n----------------------------------\n"
693      + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:"
694      + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII
695      + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
696
697    // Add same entries to compact them later
698    for (int i = 1; i <= 1200; i++) {
699      region.put(createPut(1, i));
700      if (i <= 100) {
701        region.put(createPut(2, i));
702        if (i <= 50) {
703          region.put(createPut(3, i));
704        }
705      }
706    }
707    // Now add more puts for CF2, so that we only flush CF2 to disk
708    for (int i = 100; i < 2000; i++) {
709      region.put(createPut(2, i));
710    }
711
712    long smallestSeqInRegionCurrentMemstorePhaseIII = AbstractTestFSWAL
713      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
714    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
715    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
716    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
717
718    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
719      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", "
720      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:"
721      + smallestSeqCF3PhaseIII + "\n";
722
723    // Flush!
724    cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
725    cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
726    cms1.flushInMemory();
727    cms3.flushInMemory();
728    region.flush(false);
729
730    long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
731      .getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
732    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
733    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
734    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
735
736    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
737      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
738      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
739      + smallestSeqCF3PhaseIV + "\n";
740
741    // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
742    assertTrue(
743      smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII, s);
744    assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
745    assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
746
747    HBaseTestingUtil.closeRegionAndWAL(region);
748  }
749
750  @Test
751  public void testSelectiveFlushWithBasicAndMerge() throws IOException {
752    // Set up the configuration
753    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
754    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
755    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8);
756    // set memstore to do index compaction with merge
757    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
758      String.valueOf(MemoryCompactionPolicy.BASIC));
759    // length of pipeline that requires merge
760    conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
761
762    // Intialize the HRegion
763    HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
764    verifyInMemoryFlushSize(region);
765    // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
766    for (int i = 1; i <= 1200; i++) {
767      region.put(createPut(1, i));
768      if (i <= 100) {
769        region.put(createPut(2, i));
770        if (i <= 50) {
771          region.put(createPut(3, i));
772        }
773      }
774    }
775    // Now put more entries to CF2
776    for (int i = 100; i < 2000; i++) {
777      region.put(createPut(2, i));
778    }
779
780    long totalMemstoreSize = region.getMemStoreDataSize();
781
782    // test in-memory flashing into CAM here
783    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
784      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
785    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
786      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
787
788    // Find the sizes of the memstores of each CF.
789    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
790    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
791    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
792
793    // Some other sanity checks.
794    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
795    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
796    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
797
798    // The total memstore size should be the same as the sum of the sizes of
799    // memstores of CF1, CF2 and CF3.
800    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
801      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
802
803    // Initiate in-memory Flush!
804    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
805    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
806    // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
807    while (
808      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
809        .isMemStoreFlushingInMemory()
810    ) {
811      Threads.sleep(10);
812    }
813    while (
814      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
815        .isMemStoreFlushingInMemory()
816    ) {
817      Threads.sleep(10);
818    }
819
820    // Flush-to-disk! CF2 only should be flushed
821    region.flush(false);
822
823    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
824    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
825    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
826
827    // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
828    assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
829    // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
830    assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
831    // CF2 should have been cleared
832    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
833
834    // Add the same amount of entries to see the merging
835    for (int i = 1; i <= 1200; i++) {
836      region.put(createPut(1, i));
837      if (i <= 100) {
838        region.put(createPut(2, i));
839        if (i <= 50) {
840          region.put(createPut(3, i));
841        }
842      }
843    }
844    // Now add more puts for CF2, so that we only flush CF2 to disk
845    for (int i = 100; i < 2000; i++) {
846      region.put(createPut(2, i));
847    }
848
849    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
850
851    // Flush in memory!
852    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
853    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
854    // CF1 and CF3 should be merged so wait here to be sure the merge is done
855    while (
856      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
857        .isMemStoreFlushingInMemory()
858    ) {
859      Threads.sleep(10);
860    }
861    while (
862      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
863        .isMemStoreFlushingInMemory()
864    ) {
865      Threads.sleep(10);
866    }
867    region.flush(false);
868
869    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
870    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
871
872    assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
873    // the decrease in the heap size due to usage of CellArrayMap instead of CSLM
874    // should be the same in flattening and in merge (first and second in-memory-flush)
875    // but in phase 1 we do not yet have immutable segment
876    assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
877      cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
878        - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
879    assertEquals(3, // active, one in pipeline, snapshot
880      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size());
881    // CF2 should have been cleared
882    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize(),
883      "\n<<< DEBUG: The data--heap sizes of stores before/after first flushes," + " CF1: "
884        + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize() + "--"
885        + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize()
886        + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/"
887        + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/"
888        + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize()
889        + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize()
890        + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes "
891        + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/"
892        + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/"
893        + cf1MemstoreSizePhaseIV.getHeapSize() + "\n");
894
895    HBaseTestingUtil.closeRegionAndWAL(region);
896  }
897
898  // should end in 300 seconds (5 minutes)
899  @Test
900  public void testStressFlushAndWALinIndexCompaction() throws IOException {
901    // Set up the configuration
902    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
903    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
904      200 * 1024);
905    // set memstore to do data compaction and not to use the speculative scan
906    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
907      String.valueOf(MemoryCompactionPolicy.BASIC));
908
909    // Successfully initialize the HRegion
910    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
911    verifyInMemoryFlushSize(region);
912    Thread[] threads = new Thread[25];
913    for (int i = 0; i < threads.length; i++) {
914      int id = i * 10000;
915      ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
916      threads[i] = new Thread(runnable);
917      threads[i].start();
918    }
919    Threads.sleep(10000); // let other threads start
920    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
921    Threads.sleep(10000); // let other threads continue
922    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
923
924    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
925    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
926    while (
927      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
928        .isMemStoreFlushingInMemory()
929    ) {
930      Threads.sleep(10);
931    }
932    while (
933      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
934        .isMemStoreFlushingInMemory()
935    ) {
936      Threads.sleep(10);
937    }
938
939    for (int i = 0; i < threads.length; i++) {
940      try {
941        threads[i].join();
942      } catch (InterruptedException e) {
943        e.printStackTrace();
944      }
945    }
946  }
947
948  /**
949   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
950   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
951   * releases updatesLock and compacts the pipeline.
952   */
953  private class ConcurrentPutRunnable implements Runnable {
954    private final HRegion stressedRegion;
955    private final int startNumber;
956
957    ConcurrentPutRunnable(HRegion r, int i) {
958      this.stressedRegion = r;
959      this.startNumber = i;
960    }
961
962    @Override
963    public void run() {
964
965      try {
966        int dummy = startNumber / 10000;
967        System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n");
968        // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
969        for (int i = startNumber; i <= startNumber + 3000; i++) {
970          stressedRegion.put(createPut(1, i));
971          if (i <= startNumber + 2000) {
972            stressedRegion.put(createPut(2, i));
973            if (i <= startNumber + 1000) {
974              stressedRegion.put(createPut(3, i));
975            }
976          }
977        }
978        System.out.print("Thread with start number " + startNumber + " continues to more puts\n");
979        // Now add more puts for CF2, so that we only flush CF2 to disk
980        for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
981          stressedRegion.put(createPut(2, i));
982        }
983        // And add more puts for CF1
984        for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
985          stressedRegion.put(createPut(1, i));
986        }
987        System.out.print("Thread with start number " + startNumber + " flushes\n");
988        // flush (IN MEMORY) one of the stores (each thread flushes different store)
989        // and wait till the flush and the following action are done
990        if (startNumber == 0) {
991          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
992            .flushInMemory();
993          while (
994            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
995              .isMemStoreFlushingInMemory()
996          ) {
997            Threads.sleep(10);
998          }
999        }
1000        if (startNumber == 10000) {
1001          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1002            .flushInMemory();
1003          while (
1004            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1005              .isMemStoreFlushingInMemory()
1006          ) {
1007            Threads.sleep(10);
1008          }
1009        }
1010        if (startNumber == 20000) {
1011          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1012            .flushInMemory();
1013          while (
1014            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1015              .isMemStoreFlushingInMemory()
1016          ) {
1017            Threads.sleep(10);
1018          }
1019        }
1020        System.out.print("Thread with start number " + startNumber + " finishes\n");
1021      } catch (IOException e) {
1022        assert false;
1023      }
1024    }
1025  }
1026
1027  private WAL getWAL(Region region) {
1028    return ((HRegion) region).getWAL();
1029  }
1030}