001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ThreadPoolExecutor;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HColumnDescriptor;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.HRegionInfo;
035import org.apache.hadoop.hbase.HTableDescriptor;
036import org.apache.hadoop.hbase.MemoryCompactionPolicy;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.RegionServerTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Threads;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.mockito.Mockito;
049
050/**
051 * This test verifies the correctness of the Per Column Family flushing strategy
052 * when part of the memstores are compacted memstores
053 */
054@Category({ RegionServerTests.class, LargeTests.class })
055public class TestWalAndCompactingMemStoreFlush {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class);
060
061  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
063  public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush",
064      "t1");
065
066  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
067      Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
068
069  public static final byte[] FAMILY1 = FAMILIES[0];
070  public static final byte[] FAMILY2 = FAMILIES[1];
071  public static final byte[] FAMILY3 = FAMILIES[2];
072
073  private Configuration conf;
074
075  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
076    int i = 0;
077    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
078    for (byte[] family : FAMILIES) {
079      HColumnDescriptor hcd = new HColumnDescriptor(family);
080      // even column families are going to have compacted memstore
081      if (i % 2 == 0) {
082        hcd.setInMemoryCompaction(MemoryCompactionPolicy
083            .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
084      } else {
085        hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
086      }
087      htd.addFamily(hcd);
088      i++;
089    }
090
091    HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
092    Path path = new Path(DIR, callingMethod);
093    HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false);
094    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
095    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
096    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
097    region.initialize(null);
098    return region;
099  }
100
101  // A helper function to create puts.
102  private Put createPut(int familyNum, int putNum) {
103    byte[] qf  = Bytes.toBytes("q" + familyNum);
104    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
105    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
106    Put p = new Put(row);
107    p.addColumn(FAMILIES[familyNum - 1], qf, val);
108    return p;
109  }
110
111  // A helper function to create double puts, so something can be compacted later.
112  private Put createDoublePut(int familyNum, int putNum) {
113    byte[] qf  = Bytes.toBytes("q" + familyNum);
114    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
115    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
116    Put p = new Put(row);
117    // add twice with different timestamps
118    p.addColumn(FAMILIES[familyNum - 1], qf, 10, val);
119    p.addColumn(FAMILIES[familyNum - 1], qf, 20, val);
120    return p;
121  }
122
123  private void verifyInMemoryFlushSize(Region region) {
124    assertEquals(
125      ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
126      ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
127  }
128
129  @Before
130  public void setup() {
131    conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
132    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
133        FlushNonSloppyStoresFirstPolicy.class.getName());
134    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
135  }
136
137  @Test
138  public void testSelectiveFlushWithEager() throws IOException {
139    // Set up the configuration
140    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
141    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
142    // set memstore to do data compaction
143    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
144        String.valueOf(MemoryCompactionPolicy.EAGER));
145
146    // Intialize the region
147    HRegion region = initHRegion("testSelectiveFlushWithEager", conf);
148    verifyInMemoryFlushSize(region);
149    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
150    for (int i = 1; i <= 1200; i++) {
151      region.put(createPut(1, i));        // compacted memstore, all the keys are unique
152
153      if (i <= 100) {
154        region.put(createPut(2, i));
155        if (i <= 50) {
156          // compacted memstore, subject for compaction due to duplications
157          region.put(createDoublePut(3, i));
158        }
159      }
160    }
161
162    // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk
163    for (int i = 100; i < 2000; i++) {
164      region.put(createPut(2, i));
165    }
166
167    long totalMemstoreSize = region.getMemStoreDataSize();
168
169    // Find the smallest LSNs for edits wrt to each CF.
170    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
171    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
172    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
173
174    // Find the sizes of the memstores of each CF.
175    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
176    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
177    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
178
179    // Get the overall smallest LSN in the region's memstores.
180    long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
181        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
182
183    String s = "\n\n----------------------------------\n"
184        + "Upon initial insert and before any flush, size of CF1 is:"
185        + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
186        + region.getStore(FAMILY1).isSloppyMemStore() + ". Size of CF2 is:"
187        + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
188        + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:"
189        + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
190        + region.getStore(FAMILY3).isSloppyMemStore() + "\n";
191
192    // The overall smallest LSN in the region's memstores should be the same as
193    // the LSN of the smallest edit in CF1
194    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
195
196    // Some other sanity checks.
197    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
198    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
199    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
200    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
201    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
202
203    // The total memstore size should be the same as the sum of the sizes of
204    // memstores of CF1, CF2 and CF3.
205    String msg = "totalMemstoreSize="+totalMemstoreSize +
206        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
207        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
208        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
209    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
210        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
211
212    // Flush!!!!!!!!!!!!!!!!!!!!!!
213    // We have big compacting memstore CF1 and two small memstores:
214    // CF2 (not compacted) and CF3 (compacting)
215    // All together they are above the flush size lower bound.
216    // Since CF1 and CF3 should be flushed to memory (not to disk),
217    // CF2 is going to be flushed to disk.
218    // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted
219    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
220    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
221    cms1.flushInMemory();
222    cms3.flushInMemory();
223    region.flush(false);
224
225    // Recalculate everything
226    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
227    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
228    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
229
230    long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
231        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
232    // Find the smallest LSNs for edits wrt to each CF.
233    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
234    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
235    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
236
237    s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
238        + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
239        + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
240
241    // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened
242    assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize());
243    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
244
245    // CF2 should become empty
246    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
247    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
248
249    // verify that CF3 was flushed to memory and was compacted (this is approximation check)
250    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
251    assertTrue(
252        cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize());
253
254    // Now the smallest LSN in the region should be the same as the smallest
255    // LSN in the memstore of CF1.
256    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
257
258    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
259    // memory in next flush
260    for (int i = 1200; i < 3000; i++) {
261      region.put(createPut(1, i));
262    }
263
264    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
265        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " +
266        "the smallest sequence in CF2:"
267        + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n";
268
269    // How much does the CF1 memstore occupy? Will be used later.
270    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
271    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
272
273    s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
274        + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ;
275
276
277    // Flush!!!!!!!!!!!!!!!!!!!!!!
278    // Flush again, CF1 is flushed to disk
279    // CF2 is flushed to disk, because it is not in-memory compacted memstore
280    // CF3 is flushed empty to memory (actually nothing happens to CF3)
281    region.flush(false);
282
283    // Recalculate everything
284    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
285    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
286    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
287
288    long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
289        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
290    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
291    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
292    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
293
294    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
295        + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV
296        + "\n";
297
298    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
299        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
300        "the smallest sequence in CF2:"
301        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV
302        + "\n";
303
304    // CF1's pipeline component (inserted before first flush) should be flushed to disk
305    // CF2 should be flushed to disk
306    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
307    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
308    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
309
310    // CF3 shouldn't have been touched.
311    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
312
313    // the smallest LSN of CF3 shouldn't change
314    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
315
316    // CF3 should be bottleneck for WAL
317    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
318
319    // Flush!!!!!!!!!!!!!!!!!!!!!!
320    // Trying to clean the existing memstores, CF2 all flushed to disk. The single
321    // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk.
322    region.flush(true);
323
324    // Recalculate everything
325    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
326    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
327    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
328    long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
329        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
330
331    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
332    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
333    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
334    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
335    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
336    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
337
338    // What happens when we hit the memstore limit, but we are not able to find
339    // any Column Family above the threshold?
340    // In that case, we should flush all the CFs.
341
342    // The memstore limit is 100*1024 and the column family flush threshold is
343    // around 25*1024. We try to just hit the memstore limit with each CF's
344    // memstore being below the CF flush threshold.
345    for (int i = 1; i <= 300; i++) {
346      region.put(createPut(1, i));
347      region.put(createPut(2, i));
348      region.put(createPut(3, i));
349      region.put(createPut(4, i));
350      region.put(createPut(5, i));
351    }
352
353    region.flush(false);
354
355    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
356        + smallestSeqInRegionCurrentMemstorePhaseV
357        + ". After additional inserts and last flush, the entire region size is:" + region
358        .getMemStoreDataSize()
359        + "\n----------------------------------\n";
360
361    // Since we won't find any CF above the threshold, and hence no specific
362    // store to flush, we should flush all the memstores
363    // Also compacted memstores are flushed to disk.
364    assertEquals(0, region.getMemStoreDataSize());
365    System.out.println(s);
366    HBaseTestingUtility.closeRegionAndWAL(region);
367  }
368
369  /*------------------------------------------------------------------------------*/
370  /* Check the same as above but for index-compaction type of compacting memstore */
371  @Test
372  public void testSelectiveFlushWithIndexCompaction() throws IOException {
373    /*------------------------------------------------------------------------------*/
374    /* SETUP */
375    // Set up the configuration
376    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
377    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
378    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
379    // set memstore to index-compaction
380    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
381        String.valueOf(MemoryCompactionPolicy.BASIC));
382
383    // Initialize the region
384    HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
385    verifyInMemoryFlushSize(region);
386    /*------------------------------------------------------------------------------*/
387    /* PHASE I - insertions */
388    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
389    for (int i = 1; i <= 1200; i++) {
390      region.put(createPut(1, i));        // compacted memstore
391      if (i <= 100) {
392        region.put(createPut(2, i));
393        if (i <= 50) {
394          region.put(createDoublePut(3, i)); // subject for in-memory compaction
395        }
396      }
397    }
398    // Now add more puts for CF2, so that we only flush CF2 to disk
399    for (int i = 100; i < 2000; i++) {
400      region.put(createPut(2, i));
401    }
402
403    /*------------------------------------------------------------------------------*/
404    /*------------------------------------------------------------------------------*/
405    /* PHASE I - collect sizes */
406    long totalMemstoreSizePhaseI = region.getMemStoreDataSize();
407    // Find the smallest LSNs for edits wrt to each CF.
408    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
409    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
410    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
411    // Find the sizes of the memstores of each CF.
412    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
413    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
414    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
415    // Get the overall smallest LSN in the region's memstores.
416    long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
417        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
418
419    /*------------------------------------------------------------------------------*/
420    /* PHASE I - validation */
421    // The overall smallest LSN in the region's memstores should be the same as
422    // the LSN of the smallest edit in CF1
423    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
424    // Some other sanity checks.
425    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
426    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
427    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
428    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
429    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
430
431    // The total memstore size should be the same as the sum of the sizes of
432    // memstores of CF1, CF2 and CF3.
433    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
434        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
435
436    /*------------------------------------------------------------------------------*/
437    /* PHASE I - Flush */
438    // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
439    // CF1, CF2, CF3, all together they are above the flush size lower bound.
440    // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
441    // CF1 and CF3 - flushed to memory and flatten explicitly
442    region.flush(false);
443    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
444    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
445    cms1.flushInMemory();
446    cms3.flushInMemory();
447
448    // CF3/CF1 should be merged so wait here to be sure the compaction is done
449    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
450        .isMemStoreFlushingInMemory()) {
451      Threads.sleep(10);
452    }
453    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
454        .isMemStoreFlushingInMemory()) {
455      Threads.sleep(10);
456    }
457
458    /*------------------------------------------------------------------------------*/
459    /*------------------------------------------------------------------------------*/
460    /* PHASE II - collect sizes */
461    // Recalculate everything
462    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
463    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
464    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
465    long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
466        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
467    // Find the smallest LSNs for edits wrt to each CF.
468    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
469    long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
470
471    /*------------------------------------------------------------------------------*/
472    /* PHASE II - validation */
473    // CF1 was flushed to memory, should be flattened and take less space
474    assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize());
475    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
476    // CF2 should become empty
477    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
478    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
479    // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
480    // if compacted CF# should be at least twice less because its every key was duplicated
481    assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
482    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize());
483
484    // Now the smallest LSN in the region should be the same as the smallest
485    // LSN in the memstore of CF1.
486    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
487    // The total memstore size should be the same as the sum of the sizes of
488    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
489    // items in CF1/2
490    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
491        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
492
493    /*------------------------------------------------------------------------------*/
494    /*------------------------------------------------------------------------------*/
495    /* PHASE III - insertions */
496    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
497    // memory in next flush. This is causing the CF! to be flushed to memory twice.
498    for (int i = 1200; i < 8000; i++) {
499      region.put(createPut(1, i));
500    }
501
502    // CF1 should be flatten and merged so wait here to be sure the compaction is done
503    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
504        .isMemStoreFlushingInMemory()) {
505      Threads.sleep(10);
506    }
507
508    /*------------------------------------------------------------------------------*/
509    /* PHASE III - collect sizes */
510    // How much does the CF1 memstore occupy now? Will be used later.
511    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
512    long totalMemstoreSizePhaseIII = region.getMemStoreDataSize();
513
514    /*------------------------------------------------------------------------------*/
515    /* PHASE III - validation */
516    // The total memstore size should be the same as the sum of the sizes of
517    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
518    // items in CF1/2
519    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
520        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
521
522    /*------------------------------------------------------------------------------*/
523    /* PHASE III - Flush */
524    // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
525    // CF1 is flushed to disk, but not entirely emptied.
526    // CF2 was and remained empty, same way nothing happens to CF3
527    region.flush(false);
528
529    /*------------------------------------------------------------------------------*/
530    /*------------------------------------------------------------------------------*/
531    /* PHASE IV - collect sizes */
532    // Recalculate everything
533    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
534    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
535    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
536    long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
537        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
538    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
539
540    /*------------------------------------------------------------------------------*/
541    /* PHASE IV - validation */
542    // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
543    // CF2 should remain empty
544    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
545    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
546    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
547    // CF3 shouldn't have been touched.
548    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
549    // the smallest LSN of CF3 shouldn't change
550    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
551    // CF3 should be bottleneck for WAL
552    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
553
554    /*------------------------------------------------------------------------------*/
555    /* PHASE IV - Flush */
556    // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
557    // Force flush to disk on all memstores (flush parameter true).
558    // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty
559    region.flush(true);
560
561    /*------------------------------------------------------------------------------*/
562    /*------------------------------------------------------------------------------*/
563    /* PHASE V - collect sizes */
564    // Recalculate everything
565    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
566    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
567    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
568    long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
569        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
570    long totalMemstoreSizePhaseV = region.getMemStoreDataSize();
571
572    /*------------------------------------------------------------------------------*/
573    /* PHASE V - validation */
574    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
575    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
576    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
577    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
578    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
579    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
580    // The total memstores size should be empty
581    assertEquals(0, totalMemstoreSizePhaseV);
582    // Because there is nothing in any memstore the WAL's LSN should be -1
583    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV);
584
585    // What happens when we hit the memstore limit, but we are not able to find
586    // any Column Family above the threshold?
587    // In that case, we should flush all the CFs.
588
589    /*------------------------------------------------------------------------------*/
590    /*------------------------------------------------------------------------------*/
591    /* PHASE VI - insertions */
592    // The memstore limit is 200*1024 and the column family flush threshold is
593    // around 50*1024. We try to just hit the memstore limit with each CF's
594    // memstore being below the CF flush threshold.
595    for (int i = 1; i <= 300; i++) {
596      region.put(createPut(1, i));
597      region.put(createPut(2, i));
598      region.put(createPut(3, i));
599      region.put(createPut(4, i));
600      region.put(createPut(5, i));
601    }
602
603    MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
604    MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
605    MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
606
607    /*------------------------------------------------------------------------------*/
608    /* PHASE VI - Flush */
609    // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
610    // None among compacting memstores was flushed to memory due to previous puts.
611    // But is going to be moved to pipeline and flatten due to the flush.
612    region.flush(false);
613    // Since we won't find any CF above the threshold, and hence no specific
614    // store to flush, we should flush all the memstores
615    // Also compacted memstores are flushed to disk, but not entirely emptied
616    MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
617    MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
618    MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
619
620    assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize());
621    assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize());
622    assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize());
623
624    HBaseTestingUtility.closeRegionAndWAL(region);
625  }
626
627  @Test
628  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
629    // Set up the configuration
630    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
631    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
632    // set memstore to do data compaction and not to use the speculative scan
633    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
634        String.valueOf(MemoryCompactionPolicy.EAGER));
635
636    // Intialize the HRegion
637    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
638    verifyInMemoryFlushSize(region);
639    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
640    for (int i = 1; i <= 1200; i++) {
641      region.put(createPut(1, i));
642      if (i <= 100) {
643        region.put(createPut(2, i));
644        if (i <= 50) {
645          region.put(createPut(3, i));
646        }
647      }
648    }
649    // Now add more puts for CF2, so that we only flush CF2 to disk
650    for (int i = 100; i < 2000; i++) {
651      region.put(createPut(2, i));
652    }
653
654    // in this test check the non-composite snapshot - flashing only tail of the pipeline
655    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
656    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
657
658    long totalMemstoreSize = region.getMemStoreDataSize();
659
660    // Find the sizes of the memstores of each CF.
661    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
662    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
663    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
664
665    // Some other sanity checks.
666    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
667    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
668    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
669
670    // The total memstore size should be the same as the sum of the sizes of
671    // memstores of CF1, CF2 and CF3.
672    String msg = "totalMemstoreSize="+totalMemstoreSize +
673        " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
674        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
675        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
676        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
677    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
678        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
679
680    // Flush!
681    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
682    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
683    cms1.flushInMemory();
684    cms3.flushInMemory();
685    region.flush(false);
686
687    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
688
689    long smallestSeqInRegionCurrentMemstorePhaseII =
690        region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
691    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
692    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
693    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
694
695    // CF2 should have been cleared
696    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
697    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
698
699    String s = "\n\n----------------------------------\n"
700        + "Upon initial insert and flush, LSN of CF1 is:"
701        + smallestSeqCF1PhaseII + ". LSN of CF2 is:"
702        + smallestSeqCF2PhaseII + ". LSN of CF3 is:"
703        + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:"
704        + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
705
706    // Add same entries to compact them later
707    for (int i = 1; i <= 1200; i++) {
708      region.put(createPut(1, i));
709      if (i <= 100) {
710        region.put(createPut(2, i));
711        if (i <= 50) {
712          region.put(createPut(3, i));
713        }
714      }
715    }
716    // Now add more puts for CF2, so that we only flush CF2 to disk
717    for (int i = 100; i < 2000; i++) {
718      region.put(createPut(2, i));
719    }
720
721    long smallestSeqInRegionCurrentMemstorePhaseIII =
722        region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
723    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
724    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
725    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
726
727    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
728        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " +
729        "the smallest sequence in CF2:"
730        + smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n";
731
732    // Flush!
733    cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
734    cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
735    cms1.flushInMemory();
736    cms3.flushInMemory();
737    region.flush(false);
738
739    long smallestSeqInRegionCurrentMemstorePhaseIV =
740        region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
741    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
742    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
743    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
744
745    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
746        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
747        "the smallest sequence in CF2:"
748        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + "\n";
749
750    // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
751    assertTrue(s, smallestSeqInRegionCurrentMemstorePhaseIV >
752        smallestSeqInRegionCurrentMemstorePhaseIII);
753    assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
754    assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
755
756    HBaseTestingUtility.closeRegionAndWAL(region);
757  }
758
759  @Test
760  public void testSelectiveFlushWithBasicAndMerge() throws IOException {
761    // Set up the configuration
762    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
763    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
764    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8);
765    // set memstore to do index compaction with merge
766    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
767        String.valueOf(MemoryCompactionPolicy.BASIC));
768    // length of pipeline that requires merge
769    conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
770
771    // Intialize the HRegion
772    HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
773    verifyInMemoryFlushSize(region);
774    // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
775    for (int i = 1; i <= 1200; i++) {
776      region.put(createPut(1, i));
777      if (i <= 100) {
778        region.put(createPut(2, i));
779        if (i <= 50) {
780          region.put(createPut(3, i));
781        }
782      }
783    }
784    // Now put more entries to CF2
785    for (int i = 100; i < 2000; i++) {
786      region.put(createPut(2, i));
787    }
788
789    long totalMemstoreSize = region.getMemStoreDataSize();
790
791    // test in-memory flashing into CAM here
792    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).setIndexType(
793        CompactingMemStore.IndexType.ARRAY_MAP);
794    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).setIndexType(
795        CompactingMemStore.IndexType.ARRAY_MAP);
796
797    // Find the sizes of the memstores of each CF.
798    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
799    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
800    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
801
802    // Some other sanity checks.
803    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
804    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
805    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
806
807    // The total memstore size should be the same as the sum of the sizes of
808    // memstores of CF1, CF2 and CF3.
809    assertEquals(totalMemstoreSize,
810        cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize()
811            + cf3MemstoreSizePhaseI.getDataSize());
812
813    // Initiate in-memory Flush!
814    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
815    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
816    // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
817    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
818        .isMemStoreFlushingInMemory()) {
819      Threads.sleep(10);
820    }
821    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
822        .isMemStoreFlushingInMemory()) {
823      Threads.sleep(10);
824    }
825
826    // Flush-to-disk! CF2 only should be flushed
827    region.flush(false);
828
829    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
830    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
831    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
832
833    // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
834    assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
835    // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
836    assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
837    // CF2 should have been cleared
838    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
839
840    // Add the same amount of entries to see the merging
841    for (int i = 1; i <= 1200; i++) {
842      region.put(createPut(1, i));
843      if (i <= 100) {
844        region.put(createPut(2, i));
845        if (i <= 50) {
846          region.put(createPut(3, i));
847        }
848      }
849    }
850    // Now add more puts for CF2, so that we only flush CF2 to disk
851    for (int i = 100; i < 2000; i++) {
852      region.put(createPut(2, i));
853    }
854
855    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
856
857    // Flush in memory!
858    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
859    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
860    // CF1 and CF3 should be merged so wait here to be sure the merge is done
861    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
862        .isMemStoreFlushingInMemory()) {
863      Threads.sleep(10);
864    }
865    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
866        .isMemStoreFlushingInMemory()) {
867      Threads.sleep(10);
868    }
869    region.flush(false);
870
871    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
872    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
873
874    assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
875    // the decrease in the heap size due to usage of CellArrayMap instead of CSLM
876    // should be the same in flattening and in merge (first and second in-memory-flush)
877    // but in phase 1 we do not yet have immutable segment
878    assertEquals(
879        cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
880        cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
881            - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
882    assertEquals(3, // active, one in pipeline, snapshot
883        ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
884    // CF2 should have been cleared
885    assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
886            + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII
887            .getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII
888            .getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/"
889            + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/"
890            + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize()
891            + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize()
892            + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes "
893            + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV
894            .getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV
895            .getHeapSize() + "\n",
896        0, cf2MemstoreSizePhaseIV.getDataSize());
897
898    HBaseTestingUtility.closeRegionAndWAL(region);
899  }
900
901  // should end in 300 seconds (5 minutes)
902  @Test
903  public void testStressFlushAndWALinIndexCompaction() throws IOException {
904    // Set up the configuration
905    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
906    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
907        200 * 1024);
908    // set memstore to do data compaction and not to use the speculative scan
909    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
910        String.valueOf(MemoryCompactionPolicy.BASIC));
911
912    // Successfully initialize the HRegion
913    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
914    verifyInMemoryFlushSize(region);
915    Thread[] threads = new Thread[25];
916    for (int i = 0; i < threads.length; i++) {
917      int id = i * 10000;
918      ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
919      threads[i] = new Thread(runnable);
920      threads[i].start();
921    }
922    Threads.sleep(10000); // let other threads start
923    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
924    Threads.sleep(10000); // let other threads continue
925    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
926
927    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
928    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
929    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
930        .isMemStoreFlushingInMemory()) {
931      Threads.sleep(10);
932    }
933    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
934        .isMemStoreFlushingInMemory()) {
935      Threads.sleep(10);
936    }
937
938    for (int i = 0; i < threads.length; i++) {
939      try {
940        threads[i].join();
941      } catch (InterruptedException e) {
942        e.printStackTrace();
943      }
944    }
945  }
946
947  /**
948   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
949   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
950   * releases updatesLock and compacts the pipeline.
951   */
952  private class ConcurrentPutRunnable implements Runnable {
953    private final HRegion stressedRegion;
954    private final int startNumber;
955
956    ConcurrentPutRunnable(HRegion r, int i) {
957      this.stressedRegion = r;
958      this.startNumber = i;
959    }
960
961    @Override
962    public void run() {
963
964      try {
965        int dummy = startNumber / 10000;
966        System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n");
967        // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
968        for (int i = startNumber; i <= startNumber + 3000; i++) {
969          stressedRegion.put(createPut(1, i));
970          if (i <= startNumber + 2000) {
971            stressedRegion.put(createPut(2, i));
972            if (i <= startNumber + 1000) {
973              stressedRegion.put(createPut(3, i));
974            }
975          }
976        }
977        System.out.print("Thread with start number " + startNumber + " continues to more puts\n");
978        // Now add more puts for CF2, so that we only flush CF2 to disk
979        for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
980          stressedRegion.put(createPut(2, i));
981        }
982        // And add more puts for CF1
983        for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
984          stressedRegion.put(createPut(1, i));
985        }
986        System.out.print("Thread with start number " + startNumber + " flushes\n");
987        // flush (IN MEMORY) one of the stores (each thread flushes different store)
988        // and wait till the flush and the following action are done
989        if (startNumber == 0) {
990          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
991              .flushInMemory();
992          while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
993              .isMemStoreFlushingInMemory()) {
994            Threads.sleep(10);
995          }
996        }
997        if (startNumber == 10000) {
998          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore).flushInMemory();
999          while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1000              .isMemStoreFlushingInMemory()) {
1001            Threads.sleep(10);
1002          }
1003        }
1004        if (startNumber == 20000) {
1005          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore).flushInMemory();
1006          while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1007              .isMemStoreFlushingInMemory()) {
1008            Threads.sleep(10);
1009          }
1010        }
1011        System.out.print("Thread with start number " + startNumber + " finishes\n");
1012      } catch (IOException e) {
1013        assert false;
1014      }
1015    }
1016  }
1017
1018  private WAL getWAL(Region region) {
1019    return ((HRegion)region).getWAL();
1020  }
1021}