001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ThreadPoolExecutor;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HColumnDescriptor;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.HTableDescriptor;
035import org.apache.hadoop.hbase.MemoryCompactionPolicy;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.testclassification.RegionServerTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.Threads;
042import org.apache.hadoop.hbase.wal.WAL;
043import org.junit.Before;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.mockito.Mockito;
048
049/**
050 * This test verifies the correctness of the Per Column Family flushing strategy when part of the
051 * memstores are compacted memstores
052 */
053@Category({ RegionServerTests.class, LargeTests.class })
054public class TestWalAndCompactingMemStoreFlush {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class);
059
060  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
061  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
062  public static final TableName TABLENAME =
063    TableName.valueOf("TestWalAndCompactingMemStoreFlush", "t1");
064
065  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
066    Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
067
068  public static final byte[] FAMILY1 = FAMILIES[0];
069  public static final byte[] FAMILY2 = FAMILIES[1];
070  public static final byte[] FAMILY3 = FAMILIES[2];
071
072  private Configuration conf;
073
074  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
075    int i = 0;
076    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
077    for (byte[] family : FAMILIES) {
078      HColumnDescriptor hcd = new HColumnDescriptor(family);
079      // even column families are going to have compacted memstore
080      if (i % 2 == 0) {
081        hcd.setInMemoryCompaction(MemoryCompactionPolicy
082          .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
083      } else {
084        hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
085      }
086      htd.addFamily(hcd);
087      i++;
088    }
089
090    HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
091    Path path = new Path(DIR, callingMethod);
092    HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false);
093    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
094    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
095    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
096    region.initialize(null);
097    return region;
098  }
099
100  // A helper function to create puts.
101  private Put createPut(int familyNum, int putNum) {
102    byte[] qf = Bytes.toBytes("q" + familyNum);
103    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
104    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
105    Put p = new Put(row);
106    p.addColumn(FAMILIES[familyNum - 1], qf, val);
107    return p;
108  }
109
110  // A helper function to create double puts, so something can be compacted later.
111  private Put createDoublePut(int familyNum, int putNum) {
112    byte[] qf = Bytes.toBytes("q" + familyNum);
113    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
114    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
115    Put p = new Put(row);
116    // add twice with different timestamps
117    p.addColumn(FAMILIES[familyNum - 1], qf, 10, val);
118    p.addColumn(FAMILIES[familyNum - 1], qf, 20, val);
119    return p;
120  }
121
122  private void verifyInMemoryFlushSize(Region region) {
123    assertEquals(
124      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
125      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
126  }
127
128  @Before
129  public void setup() {
130    conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
131    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
132      FlushNonSloppyStoresFirstPolicy.class.getName());
133    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
134  }
135
136  @Test
137  public void testSelectiveFlushWithEager() throws IOException {
138    // Set up the configuration
139    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
140    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
141    // set memstore to do data compaction
142    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
143      String.valueOf(MemoryCompactionPolicy.EAGER));
144
145    // Intialize the region
146    HRegion region = initHRegion("testSelectiveFlushWithEager", conf);
147    verifyInMemoryFlushSize(region);
148    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
149    for (int i = 1; i <= 1200; i++) {
150      region.put(createPut(1, i)); // compacted memstore, all the keys are unique
151
152      if (i <= 100) {
153        region.put(createPut(2, i));
154        if (i <= 50) {
155          // compacted memstore, subject for compaction due to duplications
156          region.put(createDoublePut(3, i));
157        }
158      }
159    }
160
161    // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk
162    for (int i = 100; i < 2000; i++) {
163      region.put(createPut(2, i));
164    }
165
166    long totalMemstoreSize = region.getMemStoreDataSize();
167
168    // Find the smallest LSNs for edits wrt to each CF.
169    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
170    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
171    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
172
173    // Find the sizes of the memstores of each CF.
174    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
175    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
176    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
177
178    // Get the overall smallest LSN in the region's memstores.
179    long smallestSeqInRegionCurrentMemstorePhaseI =
180      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
181
182    String s = "\n\n----------------------------------\n"
183      + "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI
184      + ", is CF1 compacted memstore?:" + region.getStore(FAMILY1).isSloppyMemStore()
185      + ". Size of CF2 is:" + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
186      + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:" + cf3MemstoreSizePhaseI
187      + ", is CF3 compacted memstore?:" + region.getStore(FAMILY3).isSloppyMemStore() + "\n";
188
189    // The overall smallest LSN in the region's memstores should be the same as
190    // the LSN of the smallest edit in CF1
191    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
192
193    // Some other sanity checks.
194    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
195    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
196    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
197    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
198    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
199
200    // The total memstore size should be the same as the sum of the sizes of
201    // memstores of CF1, CF2 and CF3.
202    String msg = "totalMemstoreSize=" + totalMemstoreSize + " cf1MemstoreSizePhaseI="
203      + cf1MemstoreSizePhaseI + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI
204      + " cf3MemstoreSizePhaseI=" + cf3MemstoreSizePhaseI;
205    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
206      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
207
208    // Flush!!!!!!!!!!!!!!!!!!!!!!
209    // We have big compacting memstore CF1 and two small memstores:
210    // CF2 (not compacted) and CF3 (compacting)
211    // All together they are above the flush size lower bound.
212    // Since CF1 and CF3 should be flushed to memory (not to disk),
213    // CF2 is going to be flushed to disk.
214    // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted
215    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
216    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
217    cms1.flushInMemory();
218    cms3.flushInMemory();
219    region.flush(false);
220
221    // Recalculate everything
222    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
223    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
224    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
225
226    long smallestSeqInRegionCurrentMemstorePhaseII =
227      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
228    // Find the smallest LSNs for edits wrt to each CF.
229    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
230    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
231    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
232
233    s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
234      + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
235      + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
236
237    // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened
238    assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize());
239    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
240
241    // CF2 should become empty
242    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
243    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
244
245    // verify that CF3 was flushed to memory and was compacted (this is approximation check)
246    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
247    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize());
248
249    // Now the smallest LSN in the region should be the same as the smallest
250    // LSN in the memstore of CF1.
251    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
252
253    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
254    // memory in next flush
255    for (int i = 1200; i < 3000; i++) {
256      region.put(createPut(1, i));
257    }
258
259    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
260      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", "
261      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseII + ", the smallest sequence in CF3:"
262      + smallestSeqCF3PhaseII + "\n";
263
264    // How much does the CF1 memstore occupy? Will be used later.
265    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
266    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
267
268    s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
269      + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n";
270
271    // Flush!!!!!!!!!!!!!!!!!!!!!!
272    // Flush again, CF1 is flushed to disk
273    // CF2 is flushed to disk, because it is not in-memory compacted memstore
274    // CF3 is flushed empty to memory (actually nothing happens to CF3)
275    region.flush(false);
276
277    // Recalculate everything
278    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
279    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
280    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
281
282    long smallestSeqInRegionCurrentMemstorePhaseIV =
283      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
284    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
285    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
286    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
287
288    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
289      + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV + "\n";
290
291    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
292      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
293      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
294      + smallestSeqCF3PhaseIV + "\n";
295
296    // CF1's pipeline component (inserted before first flush) should be flushed to disk
297    // CF2 should be flushed to disk
298    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
299    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
300    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
301
302    // CF3 shouldn't have been touched.
303    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
304
305    // the smallest LSN of CF3 shouldn't change
306    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
307
308    // CF3 should be bottleneck for WAL
309    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
310
311    // Flush!!!!!!!!!!!!!!!!!!!!!!
312    // Trying to clean the existing memstores, CF2 all flushed to disk. The single
313    // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk.
314    region.flush(true);
315
316    // Recalculate everything
317    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
318    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
319    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
320    long smallestSeqInRegionCurrentMemstorePhaseV =
321      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
322
323    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
324    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
325    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
326    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
327    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
328    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
329
330    // What happens when we hit the memstore limit, but we are not able to find
331    // any Column Family above the threshold?
332    // In that case, we should flush all the CFs.
333
334    // The memstore limit is 100*1024 and the column family flush threshold is
335    // around 25*1024. We try to just hit the memstore limit with each CF's
336    // memstore being below the CF flush threshold.
337    for (int i = 1; i <= 300; i++) {
338      region.put(createPut(1, i));
339      region.put(createPut(2, i));
340      region.put(createPut(3, i));
341      region.put(createPut(4, i));
342      region.put(createPut(5, i));
343    }
344
345    region.flush(false);
346
347    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
348      + smallestSeqInRegionCurrentMemstorePhaseV
349      + ". After additional inserts and last flush, the entire region size is:"
350      + region.getMemStoreDataSize() + "\n----------------------------------\n";
351
352    // Since we won't find any CF above the threshold, and hence no specific
353    // store to flush, we should flush all the memstores
354    // Also compacted memstores are flushed to disk.
355    assertEquals(0, region.getMemStoreDataSize());
356    System.out.println(s);
357    HBaseTestingUtility.closeRegionAndWAL(region);
358  }
359
360  /*------------------------------------------------------------------------------*/
361  /* Check the same as above but for index-compaction type of compacting memstore */
362  @Test
363  public void testSelectiveFlushWithIndexCompaction() throws IOException {
364    /*------------------------------------------------------------------------------*/
365    /* SETUP */
366    // Set up the configuration
367    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
368    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
369    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
370    // set memstore to index-compaction
371    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
372      String.valueOf(MemoryCompactionPolicy.BASIC));
373
374    // Initialize the region
375    HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
376    verifyInMemoryFlushSize(region);
377    /*------------------------------------------------------------------------------*/
378    /* PHASE I - insertions */
379    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
380    for (int i = 1; i <= 1200; i++) {
381      region.put(createPut(1, i)); // compacted memstore
382      if (i <= 100) {
383        region.put(createPut(2, i));
384        if (i <= 50) {
385          region.put(createDoublePut(3, i)); // subject for in-memory compaction
386        }
387      }
388    }
389    // Now add more puts for CF2, so that we only flush CF2 to disk
390    for (int i = 100; i < 2000; i++) {
391      region.put(createPut(2, i));
392    }
393
394    /*------------------------------------------------------------------------------*/
395    /*------------------------------------------------------------------------------*/
396    /* PHASE I - collect sizes */
397    long totalMemstoreSizePhaseI = region.getMemStoreDataSize();
398    // Find the smallest LSNs for edits wrt to each CF.
399    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
400    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
401    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
402    // Find the sizes of the memstores of each CF.
403    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
404    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
405    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
406    // Get the overall smallest LSN in the region's memstores.
407    long smallestSeqInRegionCurrentMemstorePhaseI =
408      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
409
410    /*------------------------------------------------------------------------------*/
411    /* PHASE I - validation */
412    // The overall smallest LSN in the region's memstores should be the same as
413    // the LSN of the smallest edit in CF1
414    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
415    // Some other sanity checks.
416    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
417    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
418    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
419    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
420    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
421
422    // The total memstore size should be the same as the sum of the sizes of
423    // memstores of CF1, CF2 and CF3.
424    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
425      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
426
427    /*------------------------------------------------------------------------------*/
428    /* PHASE I - Flush */
429    // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
430    // CF1, CF2, CF3, all together they are above the flush size lower bound.
431    // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
432    // CF1 and CF3 - flushed to memory and flatten explicitly
433    region.flush(false);
434    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
435    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
436    cms1.flushInMemory();
437    cms3.flushInMemory();
438
439    // CF3/CF1 should be merged so wait here to be sure the compaction is done
440    while (
441      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
442        .isMemStoreFlushingInMemory()
443    ) {
444      Threads.sleep(10);
445    }
446    while (
447      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
448        .isMemStoreFlushingInMemory()
449    ) {
450      Threads.sleep(10);
451    }
452
453    /*------------------------------------------------------------------------------*/
454    /*------------------------------------------------------------------------------*/
455    /* PHASE II - collect sizes */
456    // Recalculate everything
457    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
458    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
459    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
460    long smallestSeqInRegionCurrentMemstorePhaseII =
461      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
462    // Find the smallest LSNs for edits wrt to each CF.
463    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
464    long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
465
466    /*------------------------------------------------------------------------------*/
467    /* PHASE II - validation */
468    // CF1 was flushed to memory, should be flattened and take less space
469    assertEquals(cf1MemstoreSizePhaseII.getDataSize(), cf1MemstoreSizePhaseI.getDataSize());
470    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
471    // CF2 should become empty
472    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
473    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
474    // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
475    // if compacted CF# should be at least twice less because its every key was duplicated
476    assertEquals(cf3MemstoreSizePhaseII.getDataSize(), cf3MemstoreSizePhaseI.getDataSize());
477    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize());
478
479    // Now the smallest LSN in the region should be the same as the smallest
480    // LSN in the memstore of CF1.
481    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
482    // The total memstore size should be the same as the sum of the sizes of
483    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
484    // items in CF1/2
485    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
486      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
487
488    /*------------------------------------------------------------------------------*/
489    /*------------------------------------------------------------------------------*/
490    /* PHASE III - insertions */
491    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
492    // memory in next flush. This is causing the CF! to be flushed to memory twice.
493    for (int i = 1200; i < 8000; i++) {
494      region.put(createPut(1, i));
495    }
496
497    // CF1 should be flatten and merged so wait here to be sure the compaction is done
498    while (
499      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
500        .isMemStoreFlushingInMemory()
501    ) {
502      Threads.sleep(10);
503    }
504
505    /*------------------------------------------------------------------------------*/
506    /* PHASE III - collect sizes */
507    // How much does the CF1 memstore occupy now? Will be used later.
508    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
509    long totalMemstoreSizePhaseIII = region.getMemStoreDataSize();
510
511    /*------------------------------------------------------------------------------*/
512    /* PHASE III - validation */
513    // The total memstore size should be the same as the sum of the sizes of
514    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
515    // items in CF1/2
516    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
517      + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
518
519    /*------------------------------------------------------------------------------*/
520    /* PHASE III - Flush */
521    // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
522    // CF1 is flushed to disk, but not entirely emptied.
523    // CF2 was and remained empty, same way nothing happens to CF3
524    region.flush(false);
525
526    /*------------------------------------------------------------------------------*/
527    /*------------------------------------------------------------------------------*/
528    /* PHASE IV - collect sizes */
529    // Recalculate everything
530    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
531    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
532    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
533    long smallestSeqInRegionCurrentMemstorePhaseIV =
534      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
535    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
536
537    /*------------------------------------------------------------------------------*/
538    /* PHASE IV - validation */
539    // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
540    // CF2 should remain empty
541    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
542    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
543    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
544    // CF3 shouldn't have been touched.
545    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
546    // the smallest LSN of CF3 shouldn't change
547    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
548    // CF3 should be bottleneck for WAL
549    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
550
551    /*------------------------------------------------------------------------------*/
552    /* PHASE IV - Flush */
553    // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
554    // Force flush to disk on all memstores (flush parameter true).
555    // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty
556    region.flush(true);
557
558    /*------------------------------------------------------------------------------*/
559    /*------------------------------------------------------------------------------*/
560    /* PHASE V - collect sizes */
561    // Recalculate everything
562    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
563    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
564    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
565    long smallestSeqInRegionCurrentMemstorePhaseV =
566      getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
567    long totalMemstoreSizePhaseV = region.getMemStoreDataSize();
568
569    /*------------------------------------------------------------------------------*/
570    /* PHASE V - validation */
571    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
572    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
573    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
574    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
575    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
576    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
577    // The total memstores size should be empty
578    assertEquals(0, totalMemstoreSizePhaseV);
579    // Because there is nothing in any memstore the WAL's LSN should be -1
580    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV);
581
582    // What happens when we hit the memstore limit, but we are not able to find
583    // any Column Family above the threshold?
584    // In that case, we should flush all the CFs.
585
586    /*------------------------------------------------------------------------------*/
587    /*------------------------------------------------------------------------------*/
588    /* PHASE VI - insertions */
589    // The memstore limit is 200*1024 and the column family flush threshold is
590    // around 50*1024. We try to just hit the memstore limit with each CF's
591    // memstore being below the CF flush threshold.
592    for (int i = 1; i <= 300; i++) {
593      region.put(createPut(1, i));
594      region.put(createPut(2, i));
595      region.put(createPut(3, i));
596      region.put(createPut(4, i));
597      region.put(createPut(5, i));
598    }
599
600    MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
601    MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
602    MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
603
604    /*------------------------------------------------------------------------------*/
605    /* PHASE VI - Flush */
606    // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
607    // None among compacting memstores was flushed to memory due to previous puts.
608    // But is going to be moved to pipeline and flatten due to the flush.
609    region.flush(false);
610    // Since we won't find any CF above the threshold, and hence no specific
611    // store to flush, we should flush all the memstores
612    // Also compacted memstores are flushed to disk, but not entirely emptied
613    MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
614    MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
615    MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
616
617    assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize());
618    assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize());
619    assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize());
620
621    HBaseTestingUtility.closeRegionAndWAL(region);
622  }
623
624  @Test
625  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
626    // Set up the configuration
627    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
628    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
629    // set memstore to do data compaction and not to use the speculative scan
630    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
631      String.valueOf(MemoryCompactionPolicy.EAGER));
632
633    // Intialize the HRegion
634    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
635    verifyInMemoryFlushSize(region);
636    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
637    for (int i = 1; i <= 1200; i++) {
638      region.put(createPut(1, i));
639      if (i <= 100) {
640        region.put(createPut(2, i));
641        if (i <= 50) {
642          region.put(createPut(3, i));
643        }
644      }
645    }
646    // Now add more puts for CF2, so that we only flush CF2 to disk
647    for (int i = 100; i < 2000; i++) {
648      region.put(createPut(2, i));
649    }
650
651    // in this test check the non-composite snapshot - flashing only tail of the pipeline
652    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
653    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
654
655    long totalMemstoreSize = region.getMemStoreDataSize();
656
657    // Find the sizes of the memstores of each CF.
658    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
659    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
660    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
661
662    // Some other sanity checks.
663    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
664    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
665    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
666
667    // The total memstore size should be the same as the sum of the sizes of
668    // memstores of CF1, CF2 and CF3.
669    String msg = "totalMemstoreSize=" + totalMemstoreSize + " DefaultMemStore.DEEP_OVERHEAD="
670      + DefaultMemStore.DEEP_OVERHEAD + " cf1MemstoreSizePhaseI=" + cf1MemstoreSizePhaseI
671      + " cf2MemstoreSizePhaseI=" + cf2MemstoreSizePhaseI + " cf3MemstoreSizePhaseI="
672      + cf3MemstoreSizePhaseI;
673    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
674      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
675
676    // Flush!
677    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
678    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
679    cms1.flushInMemory();
680    cms3.flushInMemory();
681    region.flush(false);
682
683    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
684
685    long smallestSeqInRegionCurrentMemstorePhaseII =
686      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
687    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
688    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
689    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
690
691    // CF2 should have been cleared
692    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
693    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
694
695    String s = "\n\n----------------------------------\n"
696      + "Upon initial insert and flush, LSN of CF1 is:" + smallestSeqCF1PhaseII + ". LSN of CF2 is:"
697      + smallestSeqCF2PhaseII + ". LSN of CF3 is:" + smallestSeqCF3PhaseII
698      + ", smallestSeqInRegionCurrentMemstore:" + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
699
700    // Add same entries to compact them later
701    for (int i = 1; i <= 1200; i++) {
702      region.put(createPut(1, i));
703      if (i <= 100) {
704        region.put(createPut(2, i));
705        if (i <= 50) {
706          region.put(createPut(3, i));
707        }
708      }
709    }
710    // Now add more puts for CF2, so that we only flush CF2 to disk
711    for (int i = 100; i < 2000; i++) {
712      region.put(createPut(2, i));
713    }
714
715    long smallestSeqInRegionCurrentMemstorePhaseIII =
716      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
717    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
718    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
719    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
720
721    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
722      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", "
723      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIII + ", the smallest sequence in CF3:"
724      + smallestSeqCF3PhaseIII + "\n";
725
726    // Flush!
727    cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
728    cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
729    cms1.flushInMemory();
730    cms3.flushInMemory();
731    region.flush(false);
732
733    long smallestSeqInRegionCurrentMemstorePhaseIV =
734      region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
735    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
736    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
737    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
738
739    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
740      + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", "
741      + "the smallest sequence in CF2:" + smallestSeqCF2PhaseIV + ", the smallest sequence in CF3:"
742      + smallestSeqCF3PhaseIV + "\n";
743
744    // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
745    assertTrue(s,
746      smallestSeqInRegionCurrentMemstorePhaseIV > smallestSeqInRegionCurrentMemstorePhaseIII);
747    assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
748    assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
749
750    HBaseTestingUtility.closeRegionAndWAL(region);
751  }
752
753  @Test
754  public void testSelectiveFlushWithBasicAndMerge() throws IOException {
755    // Set up the configuration
756    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
757    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
758    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8);
759    // set memstore to do index compaction with merge
760    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
761      String.valueOf(MemoryCompactionPolicy.BASIC));
762    // length of pipeline that requires merge
763    conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
764
765    // Intialize the HRegion
766    HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
767    verifyInMemoryFlushSize(region);
768    // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
769    for (int i = 1; i <= 1200; i++) {
770      region.put(createPut(1, i));
771      if (i <= 100) {
772        region.put(createPut(2, i));
773        if (i <= 50) {
774          region.put(createPut(3, i));
775        }
776      }
777    }
778    // Now put more entries to CF2
779    for (int i = 100; i < 2000; i++) {
780      region.put(createPut(2, i));
781    }
782
783    long totalMemstoreSize = region.getMemStoreDataSize();
784
785    // test in-memory flashing into CAM here
786    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
787      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
788    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
789      .setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
790
791    // Find the sizes of the memstores of each CF.
792    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
793    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
794    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
795
796    // Some other sanity checks.
797    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
798    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
799    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
800
801    // The total memstore size should be the same as the sum of the sizes of
802    // memstores of CF1, CF2 and CF3.
803    assertEquals(totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
804      + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
805
806    // Initiate in-memory Flush!
807    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
808    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
809    // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
810    while (
811      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
812        .isMemStoreFlushingInMemory()
813    ) {
814      Threads.sleep(10);
815    }
816    while (
817      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
818        .isMemStoreFlushingInMemory()
819    ) {
820      Threads.sleep(10);
821    }
822
823    // Flush-to-disk! CF2 only should be flushed
824    region.flush(false);
825
826    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
827    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
828    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
829
830    // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
831    assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
832    // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
833    assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
834    // CF2 should have been cleared
835    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
836
837    // Add the same amount of entries to see the merging
838    for (int i = 1; i <= 1200; i++) {
839      region.put(createPut(1, i));
840      if (i <= 100) {
841        region.put(createPut(2, i));
842        if (i <= 50) {
843          region.put(createPut(3, i));
844        }
845      }
846    }
847    // Now add more puts for CF2, so that we only flush CF2 to disk
848    for (int i = 100; i < 2000; i++) {
849      region.put(createPut(2, i));
850    }
851
852    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
853
854    // Flush in memory!
855    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
856    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
857    // CF1 and CF3 should be merged so wait here to be sure the merge is done
858    while (
859      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
860        .isMemStoreFlushingInMemory()
861    ) {
862      Threads.sleep(10);
863    }
864    while (
865      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
866        .isMemStoreFlushingInMemory()
867    ) {
868      Threads.sleep(10);
869    }
870    region.flush(false);
871
872    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
873    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
874
875    assertEquals(2 * cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
876    // the decrease in the heap size due to usage of CellArrayMap instead of CSLM
877    // should be the same in flattening and in merge (first and second in-memory-flush)
878    // but in phase 1 we do not yet have immutable segment
879    assertEquals(cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
880      cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
881        - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
882    assertEquals(3, // active, one in pipeline, snapshot
883      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).getSegments().size());
884    // CF2 should have been cleared
885    assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
886      + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII.getDataSize()
887      + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII.getHeapSize()
888      + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/" + cf2MemstoreSizePhaseII.getDataSize()
889      + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/" + cf2MemstoreSizePhaseII.getHeapSize()
890      + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize() + "/" + cf3MemstoreSizePhaseII.getDataSize()
891      + "--" + cf3MemstoreSizePhaseI.getHeapSize() + "/" + cf3MemstoreSizePhaseII.getHeapSize()
892      + "\n<<< AND before/after second flushes " + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize()
893      + "/" + cf1MemstoreSizePhaseIV.getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize()
894      + "/" + cf1MemstoreSizePhaseIV.getHeapSize() + "\n", 0, cf2MemstoreSizePhaseIV.getDataSize());
895
896    HBaseTestingUtility.closeRegionAndWAL(region);
897  }
898
899  // should end in 300 seconds (5 minutes)
900  @Test
901  public void testStressFlushAndWALinIndexCompaction() throws IOException {
902    // Set up the configuration
903    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
904    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
905      200 * 1024);
906    // set memstore to do data compaction and not to use the speculative scan
907    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
908      String.valueOf(MemoryCompactionPolicy.BASIC));
909
910    // Successfully initialize the HRegion
911    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
912    verifyInMemoryFlushSize(region);
913    Thread[] threads = new Thread[25];
914    for (int i = 0; i < threads.length; i++) {
915      int id = i * 10000;
916      ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
917      threads[i] = new Thread(runnable);
918      threads[i].start();
919    }
920    Threads.sleep(10000); // let other threads start
921    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
922    Threads.sleep(10000); // let other threads continue
923    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
924
925    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).flushInMemory();
926    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).flushInMemory();
927    while (
928      ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
929        .isMemStoreFlushingInMemory()
930    ) {
931      Threads.sleep(10);
932    }
933    while (
934      ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
935        .isMemStoreFlushingInMemory()
936    ) {
937      Threads.sleep(10);
938    }
939
940    for (int i = 0; i < threads.length; i++) {
941      try {
942        threads[i].join();
943      } catch (InterruptedException e) {
944        e.printStackTrace();
945      }
946    }
947  }
948
949  /**
950   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
951   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
952   * releases updatesLock and compacts the pipeline.
953   */
954  private class ConcurrentPutRunnable implements Runnable {
955    private final HRegion stressedRegion;
956    private final int startNumber;
957
958    ConcurrentPutRunnable(HRegion r, int i) {
959      this.stressedRegion = r;
960      this.startNumber = i;
961    }
962
963    @Override
964    public void run() {
965
966      try {
967        int dummy = startNumber / 10000;
968        System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n");
969        // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
970        for (int i = startNumber; i <= startNumber + 3000; i++) {
971          stressedRegion.put(createPut(1, i));
972          if (i <= startNumber + 2000) {
973            stressedRegion.put(createPut(2, i));
974            if (i <= startNumber + 1000) {
975              stressedRegion.put(createPut(3, i));
976            }
977          }
978        }
979        System.out.print("Thread with start number " + startNumber + " continues to more puts\n");
980        // Now add more puts for CF2, so that we only flush CF2 to disk
981        for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
982          stressedRegion.put(createPut(2, i));
983        }
984        // And add more puts for CF1
985        for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
986          stressedRegion.put(createPut(1, i));
987        }
988        System.out.print("Thread with start number " + startNumber + " flushes\n");
989        // flush (IN MEMORY) one of the stores (each thread flushes different store)
990        // and wait till the flush and the following action are done
991        if (startNumber == 0) {
992          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
993            .flushInMemory();
994          while (
995            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
996              .isMemStoreFlushingInMemory()
997          ) {
998            Threads.sleep(10);
999          }
1000        }
1001        if (startNumber == 10000) {
1002          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1003            .flushInMemory();
1004          while (
1005            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1006              .isMemStoreFlushingInMemory()
1007          ) {
1008            Threads.sleep(10);
1009          }
1010        }
1011        if (startNumber == 20000) {
1012          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1013            .flushInMemory();
1014          while (
1015            ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1016              .isMemStoreFlushingInMemory()
1017          ) {
1018            Threads.sleep(10);
1019          }
1020        }
1021        System.out.print("Thread with start number " + startNumber + " finishes\n");
1022      } catch (IOException e) {
1023        assert false;
1024      }
1025    }
1026  }
1027
1028  private WAL getWAL(Region region) {
1029    return ((HRegion) region).getWAL();
1030  }
1031}