001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ThreadPoolExecutor;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.RegionServerTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Threads;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.mockito.Mockito;
049
050/**
051 * This test verifies the correctness of the Per Column Family flushing strategy when part of the
052 * memstores are compacted memstores
053 */
054@Category({ RegionServerTests.class, LargeTests.class })
055public class TestWalAndCompactingMemStoreFlush {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class);
060
061  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
062  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
063  public static final TableName TABLENAME =
064    TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1");
065
066  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
067    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
068
069  public static final byte[] FAMILY1 = FAMILIES[0];
070  public static final byte[] FAMILY2 = FAMILIES[1];
071  public static final byte[] FAMILY3 = FAMILIES[2];
072
073  private Configuration conf;
074
075  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
076    int i = 0;
077    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
078    for (byte[] family : FAMILIES) {
079      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
080      // even column families are going to have compacted memstore
081      if (i % 2 == 0) {
082        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy
083          .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
084      } else {
085        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
086      }
087      builder.setColumnFamily(cfBuilder.build());
088      i++;
089    }
090
091    RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
092    Path path = new Path(DIR, callingMethod);
093    HRegion region = HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build(), false);
094    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
095    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
096    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
097    region.initialize(null);
098    return region;
099  }
100
101  // A helper function to create puts.
102  private Put createPut(int familyNum, int putNum) {
103    byte[] qf = Bytes.toBytes("q" + familyNum);
104    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
105    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
106    Put p = new Put(row);
107    p.addColumn(FAMILIES[familyNum - 1], qf, val);
108    return p;
109  }
110
111  // A helper function to create double puts, so something can be compacted later.
112  private Put createDoublePut(int familyNum, int putNum) {
113    byte[] qf = Bytes.toBytes("q" + familyNum);
114    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
115    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
116    Put p = new Put(row);
117    // add twice with different timestamps
118    p.addColumn(FAMILIES[familyNum - 1], qf, 10, val);
119    p.addColumn(FAMILIES[familyNum - 1], qf, 20, val);
120    return p;
121  }
122
123  private void verifyInMemoryFlushSize(Region region) {
124    assertEquals(
125      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
126      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
127  }
128
129  @Before
130  public void setup() {
131    conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
132    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
133      FlushNonSloppyStoresFirstPolicy.class.getName());
134    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
135  }
136
137  @Test
138  public void testSelectiveFlushWithEager() throws IOException {
139    // Set up the configuration
140    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
141    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
142    // set memstore to do data compaction
143    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
144      String.valueOf(MemoryCompactionPolicy.EAGER));
145
146    // Intialize the region
147    HRegion region = initHRegion("testSelectiveFlushWithEager", conf);
148    verifyInMemoryFlushSize(region);
149    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
150    for (int i = 1; i <= 1200; i++) {
151      region.put(createPut(1, i)); // compacted memstore, all the keys are unique
152
153      if (i <= 100) {
154        region.put(createPut(2, i));
155        if (i <= 50) {
156          // compacted memstore, subject for compaction due to duplications
157          region.put(createDoublePut(3, i));
158        }
159      }
160    }
161
162    // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk
163    for (int i = 100; i < 2000; i++) {
164      region.put(createPut(2, i));
165    }
166
167    long totalMemstoreSize = region.getMemStoreDataSize();
168
169    // Find the smallest LSNs for edits wrt to each CF.
170    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
171    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
172    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
173
174    // Find the sizes of the memstores of each CF.
175    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
176    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
177    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
178
179    // Get the overall smallest LSN in the region's memstores.
180    long smallestSeqInRegionCurrentMemstorePhaseI =
181      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
182
183    String s = "\n\n----------------------------------\n"
184      + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI
185      + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore()
186      + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
187      + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI
188      + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n";
189
190    // The overall smallest LSN in the region's memstores should be the same as
191    // the LSN of the smallest edit in CF1
192    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
193
194    // Some other sanity checks.
195    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
196    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
197    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
198    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
199    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
200
201    // The total memstore size should be the same as the sum of the sizes of
202    // memstores of CF1, CF2 and CF3.
203    String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI="
204      + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI
205      + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
206    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
207      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
208
209    // Flush!!!!!!!!!!!!!!!!!!!!!!
210    // We have big compacting memstore CF1 and two small memstores:
211    // CF2 (not compacted) and CF3 (compacting)
212    // All together they are above the flush size lower bound.
213    // Since CF1 and CF3 should be flushed to memory (not to disk),
214    // CF2 is going to be flushed to disk.
215    // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted
216    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
217    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
218    cms1.flushInMemory();
219    cms3.flushInMemory();
220    region.flush(false);
221
222    // Recalculate everything
223    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
224    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
225    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
226
227    long smallestSeqInRegionCurrentMemstorePhaseII =
228      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
229    // Find the smallest LSNs for edits wrt to each CF.
230    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
231    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
232    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
233
234    s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
235      + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
236      + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
237
238    // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened
239    assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize());
240    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
241
242    // CF2 should become empty
243    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
244    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
245
246    // verify that CF3 was flushed to memory and was compacted (this is approximation check)
247    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
248    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize());
249
250    // Now the smallest LSN in the region should be the same as the smallest
251    // LSN in the memstore of CF1.
252    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
253
254    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
255    // memory in next flush
256    for (int i = 1200; i < 3000; i++) {
257      region.put(createPut(1, i));
258    }
259
260    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
261      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", "
262      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:"
263      + smallestSeqCF3PhaseII + "\n";
264
265    // How much does the CF1 memstore occupy? Will be used later.
266    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
267    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
268
269    s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
270      + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n";
271
272    // Flush!!!!!!!!!!!!!!!!!!!!!!
273    // Flush again, CF1 is flushed to disk
274    // CF2 is flushed to disk, because it is not in-memory compacted memstore
275    // CF3 is flushed empty to memory (actually nothing happens to CF3)
276    region.flush(false);
277
278    // Recalculate everything
279    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
280    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
281    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
282
283    long smallestSeqInRegionCurrentMemstorePhaseIV =
284      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
285    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
286    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
287    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
288
289    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
290      + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n";
291
292    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
293      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
294      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
295      + smallestSeqCF3PhaseIV + "\n";
296
297    // CF1's pipeline component (inserted before first flush) should be flushed to disk
298    // CF2 should be flushed to disk
299    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
300    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
301    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
302
303    // CF3 shouldn't have been touched.
304    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
305
306    // the smallest LSN of CF3 shouldn't change
307    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
308
309    // CF3 should be bottleneck for WAL
310    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
311
312    // Flush!!!!!!!!!!!!!!!!!!!!!!
313    // Trying to clean the existing memstores, CF2 all flushed to disk. The single
314    // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk.
315    region.flush(true);
316
317    // Recalculate everything
318    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
319    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
320    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
321    long smallestSeqInRegionCurrentMemstorePhaseV =
322      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
323
324    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
325    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
326    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
327    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
328    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
329    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
330
331    // What happens when we hit the memstore limit, but we are not able to find
332    // any Column Family above the threshold?
333    // In that case, we should flush all the CFs.
334
335    // The memstore limit is 100*1024 and the column family flush threshold is
336    // around 25*1024. We try to just hit the memstore limit with each CF's
337    // memstore being below the CF flush threshold.
338    for (int i = 1; i <= 300; i++) {
339      region.put(createPut(1, i));
340      region.put(createPut(2, i));
341      region.put(createPut(3, i));
342      region.put(createPut(4, i));
343      region.put(createPut(5, i));
344    }
345
346    region.flush(false);
347
348    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
349      + smallestSeqInRegionCurrentMemstorePhaseV
350      + ". After additional inserts and last flush, the entire region size is:"
351      + region.getMemStoreDataSize() + "\n----------------------------------\n";
352
353    // Since we won't find any CF above the threshold, and hence no specific
354    // store to flush, we should flush all the memstores
355    // Also compacted memstores are flushed to disk.
356    assertEquals(0, region.getMemStoreDataSize());
357    System.out.println(s);
358    HBaseTestingUtil.closeRegionAndWAL(region);
359  }
360
361  /*------------------------------------------------------------------------------*/
362  /* Check the same as above but for index-compaction type of compacting memstore */
363  @Test
364  public void testSelectiveFlushWithIndexCompaction() throws IOException {
365    /*------------------------------------------------------------------------------*/
366    /* SETUP */
367    // Set up the configuration
368    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
369    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
370    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
371    // set memstore to index-compaction
372    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
373      String.valueOf(MemoryCompactionPolicy.BASIC));
374
375    // Initialize the region
376    HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
377    verifyInMemoryFlushSize(region);
378    /*------------------------------------------------------------------------------*/
379    /* PHASE I - insertions */
380    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
381    for (int i = 1; i <= 1200; i++) {
382      region.put(createPut(1, i)); // compacted memstore
383      if (i <= 100) {
384        region.put(createPut(2, i));
385        if (i <= 50) {
386          region.put(createDoublePut(3, i)); // subject for in-memory compaction
387        }
388      }
389    }
390    // Now add more puts for CF2, so that we only flush CF2 to disk
391    for (int i = 100; i < 2000; i++) {
392      region.put(createPut(2, i));
393    }
394
395    /*------------------------------------------------------------------------------*/
396    /*------------------------------------------------------------------------------*/
397    /* PHASE I - collect sizes */
398    long totalMemstoreSizePhaseI = region.getMemStoreDataSize();
399    // Find the smallest LSNs for edits wrt to each CF.
400    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
401    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
402    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
403    // Find the sizes of the memstores of each CF.
404    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
405    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
406    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
407    // Get the overall smallest LSN in the region's memstores.
408    long smallestSeqInRegionCurrentMemstorePhaseI =
409      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
410
411    /*------------------------------------------------------------------------------*/
412    /* PHASE I - validation */
413    // The overall smallest LSN in the region's memstores should be the same as
414    // the LSN of the smallest edit in CF1
415    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
416    // Some other sanity checks.
417    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
418    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
419    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
420    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
421    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
422
423    // The total memstore size should be the same as the sum of the sizes of
424    // memstores of CF1, CF2 and CF3.
425    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
426      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
427
428    /*------------------------------------------------------------------------------*/
429    /* PHASE I - Flush */
430    // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
431    // CF1, CF2, CF3, all together they are above the flush size lower bound.
432    // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
433    // CF1 and CF3 - flushed to memory and flatten explicitly
434    region.flush(false);
435    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
436    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
437    cms1.flushInMemory();
438    cms3.flushInMemory();
439
440    // CF3/CF1 should be merged so wait here to be sure the compaction is done
441    while (
442      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
443        .isMemStoreFlushingInMemory()
444    ) {
445      Threads.sleep(10);
446    }
447    while (
448      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
449        .isMemStoreFlushingInMemory()
450    ) {
451      Threads.sleep(10);
452    }
453
454    /*------------------------------------------------------------------------------*/
455    /*------------------------------------------------------------------------------*/
456    /* PHASE II - collect sizes */
457    // Recalculate everything
458    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
459    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
460    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
461    long smallestSeqInRegionCurrentMemstorePhaseII =
462      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
463    // Find the smallest LSNs for edits wrt to each CF.
464    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
465    long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
466
467    /*------------------------------------------------------------------------------*/
468    /* PHASE II - validation */
469    // CF1 was flushed to memory, should be flattened and take less space
470    assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize());
471    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
472    // CF2 should become empty
473    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
474    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
475    // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
476    // if compacted CF# should be at least twice less because its every key was duplicated
477    assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize());
478    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize());
479
480    // Now the smallest LSN in the region should be the same as the smallest
481    // LSN in the memstore of CF1.
482    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
483    // The total memstore size should be the same as the sum of the sizes of
484    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
485    // items in CF1/2
486    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
487      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
488
489    /*------------------------------------------------------------------------------*/
490    /*------------------------------------------------------------------------------*/
491    /* PHASE III - insertions */
492    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
493    // memory in next flush. This is causing the CF! to be flushed to memory twice.
494    for (int i = 1200; i < 8000; i++) {
495      region.put(createPut(1, i));
496    }
497
498    // CF1 should be flatten and merged so wait here to be sure the compaction is done
499    while (
500      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
501        .isMemStoreFlushingInMemory()
502    ) {
503      Threads.sleep(10);
504    }
505
506    /*------------------------------------------------------------------------------*/
507    /* PHASE III - collect sizes */
508    // How much does the CF1 memstore occupy now? Will be used later.
509    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
510    long totalMemstoreSizePhaseIII = region.getMemStoreDataSize();
511
512    /*------------------------------------------------------------------------------*/
513    /* PHASE III - validation */
514    // The total memstore size should be the same as the sum of the sizes of
515    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
516    // items in CF1/2
517    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
518      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
519
520    /*------------------------------------------------------------------------------*/
521    /* PHASE III - Flush */
522    // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
523    // CF1 is flushed to disk, but not entirely emptied.
524    // CF2 was and remained empty, same way nothing happens to CF3
525    region.flush(false);
526
527    /*------------------------------------------------------------------------------*/
528    /*------------------------------------------------------------------------------*/
529    /* PHASE IV - collect sizes */
530    // Recalculate everything
531    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
532    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
533    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
534    long smallestSeqInRegionCurrentMemstorePhaseIV =
535      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
536    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
537
538    /*------------------------------------------------------------------------------*/
539    /* PHASE IV - validation */
540    // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
541    // CF2 should remain empty
542    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
543    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
544    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
545    // CF3 shouldn't have been touched.
546    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
547    // the smallest LSN of CF3 shouldn't change
548    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
549    // CF3 should be bottleneck for WAL
550    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
551
552    /*------------------------------------------------------------------------------*/
553    /* PHASE IV - Flush */
554    // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
555    // Force flush to disk on all memstores (flush parameter true).
556    // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty
557    region.flush(true);
558
559    /*------------------------------------------------------------------------------*/
560    /*------------------------------------------------------------------------------*/
561    /* PHASE V - collect sizes */
562    // Recalculate everything
563    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
564    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
565    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
566    long smallestSeqInRegionCurrentMemstorePhaseV =
567      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
568    long totalMemstoreSizePhaseV = region.getMemStoreDataSize();
569
570    /*------------------------------------------------------------------------------*/
571    /* PHASE V - validation */
572    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
573    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
574    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
575    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
576    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
577    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
578    // The total memstores size should be empty
579    assertEquals(0, totalMemstoreSizePhaseV);
580    // Because there is nothing in any memstore the WAL's LSN should be -1
581    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV);
582
583    // What happens when we hit the memstore limit, but we are not able to find
584    // any Column Family above the threshold?
585    // In that case, we should flush all the CFs.
586
587    /*------------------------------------------------------------------------------*/
588    /*------------------------------------------------------------------------------*/
589    /* PHASE VI - insertions */
590    // The memstore limit is 200*1024 and the column family flush threshold is
591    // around 50*1024. We try to just hit the memstore limit with each CF's
592    // memstore being below the CF flush threshold.
593    for (int i = 1; i <= 300; i++) {
594      region.put(createPut(1, i));
595      region.put(createPut(2, i));
596      region.put(createPut(3, i));
597      region.put(createPut(4, i));
598      region.put(createPut(5, i));
599    }
600
601    MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
602    MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
603    MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
604
605    /*------------------------------------------------------------------------------*/
606    /* PHASE VI - Flush */
607    // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
608    // None among compacting memstores was flushed to memory due to previous puts.
609    // But is going to be moved to pipeline and flatten due to the flush.
610    region.flush(false);
611    // Since we won't find any CF above the threshold, and hence no specific
612    // store to flush, we should flush all the memstores
613    // Also compacted memstores are flushed to disk, but not entirely emptied
614    MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
615    MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
616    MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
617
618    assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize());
619    assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize());
620    assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize());
621
622    HBaseTestingUtil.closeRegionAndWAL(region);
623  }
624
625  @Test
626  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
627    // Set up the configuration
628    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
629    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
630    // set memstore to do data compaction and not to use the speculative scan
631    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
632      String.valueOf(MemoryCompactionPolicy.EAGER));
633
634    // Intialize the HRegion
635    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
636    verifyInMemoryFlushSize(region);
637    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
638    for (int i = 1; i <= 1200; i++) {
639      region.put(createPut(1, i));
640      if (i <= 100) {
641        region.put(createPut(2, i));
642        if (i <= 50) {
643          region.put(createPut(3, i));
644        }
645      }
646    }
647    // Now add more puts for CF2, so that we only flush CF2 to disk
648    for (int i = 100; i < 2000; i++) {
649      region.put(createPut(2, i));
650    }
651
652    // in this test check the non-composite snapshot - flashing only tail of the pipeline
653    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
654    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
655
656    long totalMemstoreSize = region.getMemStoreDataSize();
657
658    // Find the sizes of the memstores of each CF.
659    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
660    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
661    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
662
663    // Some other sanity checks.
664    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
665    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
666    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
667
668    // The total memstore size should be the same as the sum of the sizes of
669    // memstores of CF1, CF2 and CF3.
670    String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="
671      + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI
672      + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="
673      + cf3MemstoreSizePhaseI;
674    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
675      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
676
677    // Flush!
678    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
679    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
680    cms1.flushInMemory();
681    cms3.flushInMemory();
682    region.flush(false);
683
684    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
685
686    long smallestSeqInRegionCurrentMemstorePhaseII =
687      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
688    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
689    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
690    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
691
692    // CF2 should have been cleared
693    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
694    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
695
696    String s = "\n\n----------------------------------\n"
697      + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:"
698      + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII
699      + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
700
701    // Add same entries to compact them later
702    for (int i = 1; i <= 1200; i++) {
703      region.put(createPut(1, i));
704      if (i <= 100) {
705        region.put(createPut(2, i));
706        if (i <= 50) {
707          region.put(createPut(3, i));
708        }
709      }
710    }
711    // Now add more puts for CF2, so that we only flush CF2 to disk
712    for (int i = 100; i < 2000; i++) {
713      region.put(createPut(2, i));
714    }
715
716    long smallestSeqInRegionCurrentMemstorePhaseIII =
717      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
718    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
719    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
720    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
721
722    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
723      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", "
724      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:"
725      + smallestSeqCF3PhaseIII + "\n";
726
727    // Flush!
728    cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
729    cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
730    cms1.flushInMemory();
731    cms3.flushInMemory();
732    region.flush(false);
733
734    long smallestSeqInRegionCurrentMemstorePhaseIV =
735      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
736    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
737    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
738    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
739
740    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
741      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
742      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
743      + smallestSeqCF3PhaseIV + "\n";
744
745    // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
746    assertTrue(s,
747      smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
748    assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
749    assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
750
751    HBaseTestingUtil.closeRegionAndWAL(region);
752  }
753
754  @Test
755  public void testSelectiveFlushWithBasicAndMerge() throws IOException {
756    // Set up the configuration
757    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
758    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
759    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8);
760    // set memstore to do index compaction with merge
761    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
762      String.valueOf(MemoryCompactionPolicy.BASIC));
763    // length of pipeline that requires merge
764    conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
765
766    // Intialize the HRegion
767    HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
768    verifyInMemoryFlushSize(region);
769    // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
770    for (int i = 1; i <= 1200; i++) {
771      region.put(createPut(1, i));
772      if (i <= 100) {
773        region.put(createPut(2, i));
774        if (i <= 50) {
775          region.put(createPut(3, i));
776        }
777      }
778    }
779    // Now put more entries to CF2
780    for (int i = 100; i < 2000; i++) {
781      region.put(createPut(2, i));
782    }
783
784    long totalMemstoreSize = region.getMemStoreDataSize();
785
786    // test in-memory flashing into CAM here
787    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
788      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
789    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
790      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
791
792    // Find the sizes of the memstores of each CF.
793    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
794    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
795    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
796
797    // Some other sanity checks.
798    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
799    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
800    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
801
802    // The total memstore size should be the same as the sum of the sizes of
803    // memstores of CF1, CF2 and CF3.
804    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
805      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
806
807    // Initiate in-memory Flush!
808    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
809    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
810    // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
811    while (
812      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
813        .isMemStoreFlushingInMemory()
814    ) {
815      Threads.sleep(10);
816    }
817    while (
818      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
819        .isMemStoreFlushingInMemory()
820    ) {
821      Threads.sleep(10);
822    }
823
824    // Flush-to-disk! CF2 only should be flushed
825    region.flush(false);
826
827    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
828    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
829    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
830
831    // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
832    assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
833    // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
834    assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
835    // CF2 should have been cleared
836    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
837
838    // Add the same amount of entries to see the merging
839    for (int i = 1; i <= 1200; i++) {
840      region.put(createPut(1, i));
841      if (i <= 100) {
842        region.put(createPut(2, i));
843        if (i <= 50) {
844          region.put(createPut(3, i));
845        }
846      }
847    }
848    // Now add more puts for CF2, so that we only flush CF2 to disk
849    for (int i = 100; i < 2000; i++) {
850      region.put(createPut(2, i));
851    }
852
853    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
854
855    // Flush in memory!
856    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
857    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
858    // CF1 and CF3 should be merged so wait here to be sure the merge is done
859    while (
860      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
861        .isMemStoreFlushingInMemory()
862    ) {
863      Threads.sleep(10);
864    }
865    while (
866      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
867        .isMemStoreFlushingInMemory()
868    ) {
869      Threads.sleep(10);
870    }
871    region.flush(false);
872
873    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
874    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
875
876    assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
877    // the decrease in the heap size due to usage of CellArrayMap instead of CSLM
878    // should be the same in flattening and in merge (first and second in-memory-flush)
879    // but in phase 1 we do not yet have immutable segment
880    assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
881      cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
882        - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
883    assertEquals(3, // active, one in pipeline, snapshot
884      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size());
885    // CF2 should have been cleared
886    assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
887      + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize()
888      + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize()
889      + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" + cf2MemstoreSizePhaseII.getDataSize()
890      + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" + cf2MemstoreSizePhaseII.getHeapSize()
891      + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() + "/" + cf3MemstoreSizePhaseII.getDataSize()
892      + "--" + cf3MemstoreSizePhaseI.getHeapSize() + "/" + cf3MemstoreSizePhaseII.getHeapSize()
893      + "\n<<< AND before/after second flushes " + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize()
894      + "/" + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize()
895      + "/" + cf1MemstoreSizePhaseIV.getHeapSize() + "\n", 0, cf2MemstoreSizePhaseIV.getDataSize());
896
897    HBaseTestingUtil.closeRegionAndWAL(region);
898  }
899
900  // should end in 300 seconds (5 minutes)
901  @Test
902  public void testStressFlushAndWALinIndexCompaction() throws IOException {
903    // Set up the configuration
904    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
905    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
906      200 * 1024);
907    // set memstore to do data compaction and not to use the speculative scan
908    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
909      String.valueOf(MemoryCompactionPolicy.BASIC));
910
911    // Successfully initialize the HRegion
912    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
913    verifyInMemoryFlushSize(region);
914    Thread[] threads = new Thread[25];
915    for (int i = 0; i < threads.length; i++) {
916      int id = i * 10000;
917      ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
918      threads[i] = new Thread(runnable);
919      threads[i].start();
920    }
921    Threads.sleep(10000); // let other threads start
922    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
923    Threads.sleep(10000); // let other threads continue
924    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
925
926    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
927    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
928    while (
929      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
930        .isMemStoreFlushingInMemory()
931    ) {
932      Threads.sleep(10);
933    }
934    while (
935      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
936        .isMemStoreFlushingInMemory()
937    ) {
938      Threads.sleep(10);
939    }
940
941    for (int i = 0; i < threads.length; i++) {
942      try {
943        threads[i].join();
944      } catch (InterruptedException e) {
945        e.printStackTrace();
946      }
947    }
948  }
949
950  /**
951   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
952   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
953   * releases updatesLock and compacts the pipeline.
954   */
955  private class ConcurrentPutRunnable implements Runnable {
956    private final HRegion stressedRegion;
957    private final int startNumber;
958
959    ConcurrentPutRunnable(HRegion r, int i) {
960      this.stressedRegion = r;
961      this.startNumber = i;
962    }
963
964    @Override
965    public void run() {
966
967      try {
968        int dummy = startNumber / 10000;
969        System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n");
970        // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
971        for (int i = startNumber; i <= startNumber + 3000; i++) {
972          stressedRegion.put(createPut(1, i));
973          if (i <= startNumber + 2000) {
974            stressedRegion.put(createPut(2, i));
975            if (i <= startNumber + 1000) {
976              stressedRegion.put(createPut(3, i));
977            }
978          }
979        }
980        System.out.print("Thread with start number " + startNumber + " continues to more puts\n");
981        // Now add more puts for CF2, so that we only flush CF2 to disk
982        for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
983          stressedRegion.put(createPut(2, i));
984        }
985        // And add more puts for CF1
986        for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
987          stressedRegion.put(createPut(1, i));
988        }
989        System.out.print("Thread with start number " + startNumber + " flushes\n");
990        // flush (IN MEMORY) one of the stores (each thread flushes different store)
991        // and wait till the flush and the following action are done
992        if (startNumber == 0) {
993          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
994            .flushInMemory();
995          while (
996            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
997              .isMemStoreFlushingInMemory()
998          ) {
999            Threads.sleep(10);
1000          }
1001        }
1002        if (startNumber == 10000) {
1003          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1004            .flushInMemory();
1005          while (
1006            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1007              .isMemStoreFlushingInMemory()
1008          ) {
1009            Threads.sleep(10);
1010          }
1011        }
1012        if (startNumber == 20000) {
1013          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1014            .flushInMemory();
1015          while (
1016            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1017              .isMemStoreFlushingInMemory()
1018          ) {
1019            Threads.sleep(10);
1020          }
1021        }
1022        System.out.print("Thread with start number " + startNumber + " finishes\n");
1023      } catch (IOException e) {
1024        assert false;
1025      }
1026    }
1027  }
1028
1029  private WAL getWAL(Region region) {
1030    return ((HRegion) region).getWAL();
1031  }
1032}