001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ThreadPoolExecutor;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.RegionServerTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.Threads;
043import org.apache.hadoop.hbase.wal.WAL;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.mockito.Mockito;
049
050/**
051 * This test verifies the correctness of the Per Column Family flushing strategy
052 * when part of the memstores are compacted memstores
053 */
054@Category({ RegionServerTests.class, LargeTests.class })
055public class TestWalAndCompactingMemStoreFlush {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestWalAndCompactingMemStoreFlush.class);
060
061  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
062  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
063  public static final TableName TABLENAME = TableName.valueOf("TestWalAndCompactingMemStoreFlush",
064      "t1");
065
066  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
067      Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
068
069  public static final byte[] FAMILY1 = FAMILIES[0];
070  public static final byte[] FAMILY2 = FAMILIES[1];
071  public static final byte[] FAMILY3 = FAMILIES[2];
072
073  private Configuration conf;
074
075  private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
076    int i = 0;
077    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLENAME);
078    for (byte[] family : FAMILIES) {
079      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
080      // even column families are going to have compacted memstore
081      if (i % 2 == 0) {
082        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy
083          .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY)));
084      } else {
085        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
086      }
087      builder.setColumnFamily(cfBuilder.build());
088      i++;
089    }
090
091    RegionInfo info = RegionInfoBuilder.newBuilder(TABLENAME).build();
092    Path path = new Path(DIR, callingMethod);
093    HRegion region =
094      HBaseTestingUtil.createRegionAndWAL(info, path, conf, builder.build(), false);
095    region.regionServicesForStores = Mockito.spy(region.regionServicesForStores);
096    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
097    Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
098    region.initialize(null);
099    return region;
100  }
101
102  // A helper function to create puts.
103  private Put createPut(int familyNum, int putNum) {
104    byte[] qf  = Bytes.toBytes("q" + familyNum);
105    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
106    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
107    Put p = new Put(row);
108    p.addColumn(FAMILIES[familyNum - 1], qf, val);
109    return p;
110  }
111
112  // A helper function to create double puts, so something can be compacted later.
113  private Put createDoublePut(int familyNum, int putNum) {
114    byte[] qf  = Bytes.toBytes("q" + familyNum);
115    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
116    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
117    Put p = new Put(row);
118    // add twice with different timestamps
119    p.addColumn(FAMILIES[familyNum - 1], qf, 10, val);
120    p.addColumn(FAMILIES[familyNum - 1], qf, 20, val);
121    return p;
122  }
123
124  private void verifyInMemoryFlushSize(Region region) {
125    assertEquals(
126      ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(),
127      ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize());
128  }
129
130  @Before
131  public void setup() {
132    conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
133    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
134        FlushNonSloppyStoresFirstPolicy.class.getName());
135    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
136  }
137
138  @Test
139  public void testSelectiveFlushWithEager() throws IOException {
140    // Set up the configuration
141    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
142    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
143    // set memstore to do data compaction
144    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
145        String.valueOf(MemoryCompactionPolicy.EAGER));
146
147    // Intialize the region
148    HRegion region = initHRegion("testSelectiveFlushWithEager", conf);
149    verifyInMemoryFlushSize(region);
150    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
151    for (int i = 1; i <= 1200; i++) {
152      region.put(createPut(1, i));        // compacted memstore, all the keys are unique
153
154      if (i <= 100) {
155        region.put(createPut(2, i));
156        if (i <= 50) {
157          // compacted memstore, subject for compaction due to duplications
158          region.put(createDoublePut(3, i));
159        }
160      }
161    }
162
163    // Now add more puts for CF2, so that we only flush CF2 (DefaultMemStore) to disk
164    for (int i = 100; i < 2000; i++) {
165      region.put(createPut(2, i));
166    }
167
168    long totalMemstoreSize = region.getMemStoreDataSize();
169
170    // Find the smallest LSNs for edits wrt to each CF.
171    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
172    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
173    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
174
175    // Find the sizes of the memstores of each CF.
176    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
177    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
178    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
179
180    // Get the overall smallest LSN in the region's memstores.
181    long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
182        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
183
184    String s = "\n\n----------------------------------\n"
185        + "Upon initial insert and before any flush, size of CF1 is:"
186        + cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
187        + region.getStore(FAMILY1).isSloppyMemStore() + ". Size of CF2 is:"
188        + cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
189        + region.getStore(FAMILY2).isSloppyMemStore() + ". Size of CF3 is:"
190        + cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
191        + region.getStore(FAMILY3).isSloppyMemStore() + "\n";
192
193    // The overall smallest LSN in the region's memstores should be the same as
194    // the LSN of the smallest edit in CF1
195    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
196
197    // Some other sanity checks.
198    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
199    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
200    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
201    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
202    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
203
204    // The total memstore size should be the same as the sum of the sizes of
205    // memstores of CF1, CF2 and CF3.
206    String msg = "totalMemstoreSize="+totalMemstoreSize +
207        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
208        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
209        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
210    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
211        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
212
213    // Flush!!!!!!!!!!!!!!!!!!!!!!
214    // We have big compacting memstore CF1 and two small memstores:
215    // CF2 (not compacted) and CF3 (compacting)
216    // All together they are above the flush size lower bound.
217    // Since CF1 and CF3 should be flushed to memory (not to disk),
218    // CF2 is going to be flushed to disk.
219    // CF1 - nothing to compact (but flattening), CF3 - should be twice compacted
220    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
221    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
222    cms1.flushInMemory();
223    cms3.flushInMemory();
224    region.flush(false);
225
226    // Recalculate everything
227    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
228    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
229    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
230
231    long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
232        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
233    // Find the smallest LSNs for edits wrt to each CF.
234    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
235    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
236    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
237
238    s = s + "\n----After first flush! CF1 should be flushed to memory, but not compacted.---\n"
239        + "Size of CF1 is:" + cf1MemstoreSizePhaseII + ", size of CF2 is:" + cf2MemstoreSizePhaseII
240        + ", size of CF3 is:" + cf3MemstoreSizePhaseII + "\n";
241
242    // CF1 was flushed to memory, but there is nothing to compact, and CF1 was flattened
243    assertTrue(cf1MemstoreSizePhaseII.getDataSize() == cf1MemstoreSizePhaseI.getDataSize());
244    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
245
246    // CF2 should become empty
247    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
248    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
249
250    // verify that CF3 was flushed to memory and was compacted (this is approximation check)
251    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > cf3MemstoreSizePhaseII.getDataSize());
252    assertTrue(
253        cf3MemstoreSizePhaseI.getHeapSize() / 2 > cf3MemstoreSizePhaseII.getHeapSize());
254
255    // Now the smallest LSN in the region should be the same as the smallest
256    // LSN in the memstore of CF1.
257    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
258
259    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
260    // memory in next flush
261    for (int i = 1200; i < 3000; i++) {
262      region.put(createPut(1, i));
263    }
264
265    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseII
266        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseII + ", " +
267        "the smallest sequence in CF2:"
268        + smallestSeqCF2PhaseII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseII + "\n";
269
270    // How much does the CF1 memstore occupy? Will be used later.
271    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
272    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
273
274    s = s + "----After more puts into CF1 its size is:" + cf1MemstoreSizePhaseIII
275        + ", and its sequence is:" + smallestSeqCF1PhaseIII + " ----\n" ;
276
277
278    // Flush!!!!!!!!!!!!!!!!!!!!!!
279    // Flush again, CF1 is flushed to disk
280    // CF2 is flushed to disk, because it is not in-memory compacted memstore
281    // CF3 is flushed empty to memory (actually nothing happens to CF3)
282    region.flush(false);
283
284    // Recalculate everything
285    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
286    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
287    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
288
289    long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
290        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
291    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
292    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
293    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
294
295    s = s + "----After SECOND FLUSH, CF1 size is:" + cf1MemstoreSizePhaseIV + ", CF2 size is:"
296        + cf2MemstoreSizePhaseIV + " and CF3 size is:" + cf3MemstoreSizePhaseIV
297        + "\n";
298
299    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
300        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
301        "the smallest sequence in CF2:"
302        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV
303        + "\n";
304
305    // CF1's pipeline component (inserted before first flush) should be flushed to disk
306    // CF2 should be flushed to disk
307    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
308    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
309    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
310
311    // CF3 shouldn't have been touched.
312    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
313
314    // the smallest LSN of CF3 shouldn't change
315    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
316
317    // CF3 should be bottleneck for WAL
318    assertEquals(s, smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
319
320    // Flush!!!!!!!!!!!!!!!!!!!!!!
321    // Trying to clean the existing memstores, CF2 all flushed to disk. The single
322    // memstore segment in the compaction pipeline of CF1 and CF3 should be flushed to disk.
323    region.flush(true);
324
325    // Recalculate everything
326    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
327    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
328    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
329    long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
330        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
331
332    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
333    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
334    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
335    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
336    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
337    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
338
339    // What happens when we hit the memstore limit, but we are not able to find
340    // any Column Family above the threshold?
341    // In that case, we should flush all the CFs.
342
343    // The memstore limit is 100*1024 and the column family flush threshold is
344    // around 25*1024. We try to just hit the memstore limit with each CF's
345    // memstore being below the CF flush threshold.
346    for (int i = 1; i <= 300; i++) {
347      region.put(createPut(1, i));
348      region.put(createPut(2, i));
349      region.put(createPut(3, i));
350      region.put(createPut(4, i));
351      region.put(createPut(5, i));
352    }
353
354    region.flush(false);
355
356    s = s + "----AFTER THIRD AND FORTH FLUSH, The smallest sequence in region WAL is: "
357        + smallestSeqInRegionCurrentMemstorePhaseV
358        + ". After additional inserts and last flush, the entire region size is:" + region
359        .getMemStoreDataSize()
360        + "\n----------------------------------\n";
361
362    // Since we won't find any CF above the threshold, and hence no specific
363    // store to flush, we should flush all the memstores
364    // Also compacted memstores are flushed to disk.
365    assertEquals(0, region.getMemStoreDataSize());
366    System.out.println(s);
367    HBaseTestingUtil.closeRegionAndWAL(region);
368  }
369
370  /*------------------------------------------------------------------------------*/
371  /* Check the same as above but for index-compaction type of compacting memstore */
372  @Test
373  public void testSelectiveFlushWithIndexCompaction() throws IOException {
374    /*------------------------------------------------------------------------------*/
375    /* SETUP */
376    // Set up the configuration
377    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
378    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
379    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.5);
380    // set memstore to index-compaction
381    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
382        String.valueOf(MemoryCompactionPolicy.BASIC));
383
384    // Initialize the region
385    HRegion region = initHRegion("testSelectiveFlushWithIndexCompaction", conf);
386    verifyInMemoryFlushSize(region);
387    /*------------------------------------------------------------------------------*/
388    /* PHASE I - insertions */
389    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
390    for (int i = 1; i <= 1200; i++) {
391      region.put(createPut(1, i));        // compacted memstore
392      if (i <= 100) {
393        region.put(createPut(2, i));
394        if (i <= 50) {
395          region.put(createDoublePut(3, i)); // subject for in-memory compaction
396        }
397      }
398    }
399    // Now add more puts for CF2, so that we only flush CF2 to disk
400    for (int i = 100; i < 2000; i++) {
401      region.put(createPut(2, i));
402    }
403
404    /*------------------------------------------------------------------------------*/
405    /*------------------------------------------------------------------------------*/
406    /* PHASE I - collect sizes */
407    long totalMemstoreSizePhaseI = region.getMemStoreDataSize();
408    // Find the smallest LSNs for edits wrt to each CF.
409    long smallestSeqCF1PhaseI = region.getOldestSeqIdOfStore(FAMILY1);
410    long smallestSeqCF2PhaseI = region.getOldestSeqIdOfStore(FAMILY2);
411    long smallestSeqCF3PhaseI = region.getOldestSeqIdOfStore(FAMILY3);
412    // Find the sizes of the memstores of each CF.
413    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
414    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
415    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
416    // Get the overall smallest LSN in the region's memstores.
417    long smallestSeqInRegionCurrentMemstorePhaseI = getWAL(region)
418        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
419
420    /*------------------------------------------------------------------------------*/
421    /* PHASE I - validation */
422    // The overall smallest LSN in the region's memstores should be the same as
423    // the LSN of the smallest edit in CF1
424    assertEquals(smallestSeqCF1PhaseI, smallestSeqInRegionCurrentMemstorePhaseI);
425    // Some other sanity checks.
426    assertTrue(smallestSeqCF1PhaseI < smallestSeqCF2PhaseI);
427    assertTrue(smallestSeqCF2PhaseI < smallestSeqCF3PhaseI);
428    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
429    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
430    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
431
432    // The total memstore size should be the same as the sum of the sizes of
433    // memstores of CF1, CF2 and CF3.
434    assertEquals(totalMemstoreSizePhaseI, cf1MemstoreSizePhaseI.getDataSize()
435        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
436
437    /*------------------------------------------------------------------------------*/
438    /* PHASE I - Flush */
439    // First Flush in Test!!!!!!!!!!!!!!!!!!!!!!
440    // CF1, CF2, CF3, all together they are above the flush size lower bound.
441    // Since CF1 and CF3 are compacting, CF2 is going to be flushed to disk.
442    // CF1 and CF3 - flushed to memory and flatten explicitly
443    region.flush(false);
444    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
445    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
446    cms1.flushInMemory();
447    cms3.flushInMemory();
448
449    // CF3/CF1 should be merged so wait here to be sure the compaction is done
450    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
451        .isMemStoreFlushingInMemory()) {
452      Threads.sleep(10);
453    }
454    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore)
455        .isMemStoreFlushingInMemory()) {
456      Threads.sleep(10);
457    }
458
459    /*------------------------------------------------------------------------------*/
460    /*------------------------------------------------------------------------------*/
461    /* PHASE II - collect sizes */
462    // Recalculate everything
463    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
464    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
465    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
466    long smallestSeqInRegionCurrentMemstorePhaseII = getWAL(region)
467        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
468    // Find the smallest LSNs for edits wrt to each CF.
469    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
470    long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
471
472    /*------------------------------------------------------------------------------*/
473    /* PHASE II - validation */
474    // CF1 was flushed to memory, should be flattened and take less space
475    assertEquals(cf1MemstoreSizePhaseII.getDataSize() , cf1MemstoreSizePhaseI.getDataSize());
476    assertTrue(cf1MemstoreSizePhaseII.getHeapSize() < cf1MemstoreSizePhaseI.getHeapSize());
477    // CF2 should become empty
478    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
479    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
480    // verify that CF3 was flushed to memory and was not compacted (this is an approximation check)
481    // if compacted CF# should be at least twice less because its every key was duplicated
482    assertEquals(cf3MemstoreSizePhaseII.getDataSize() , cf3MemstoreSizePhaseI.getDataSize());
483    assertTrue(cf3MemstoreSizePhaseI.getHeapSize() / 2 < cf3MemstoreSizePhaseII.getHeapSize());
484
485    // Now the smallest LSN in the region should be the same as the smallest
486    // LSN in the memstore of CF1.
487    assertEquals(smallestSeqInRegionCurrentMemstorePhaseII, smallestSeqCF1PhaseI);
488    // The total memstore size should be the same as the sum of the sizes of
489    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
490    // items in CF1/2
491    assertEquals(totalMemstoreSizePhaseII, cf1MemstoreSizePhaseII.getDataSize()
492        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
493
494    /*------------------------------------------------------------------------------*/
495    /*------------------------------------------------------------------------------*/
496    /* PHASE III - insertions */
497    // Now add more puts for CF1, so that we also flush CF1 to disk instead of
498    // memory in next flush. This is causing the CF! to be flushed to memory twice.
499    for (int i = 1200; i < 8000; i++) {
500      region.put(createPut(1, i));
501    }
502
503    // CF1 should be flatten and merged so wait here to be sure the compaction is done
504    while (((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore)
505        .isMemStoreFlushingInMemory()) {
506      Threads.sleep(10);
507    }
508
509    /*------------------------------------------------------------------------------*/
510    /* PHASE III - collect sizes */
511    // How much does the CF1 memstore occupy now? Will be used later.
512    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
513    long totalMemstoreSizePhaseIII = region.getMemStoreDataSize();
514
515    /*------------------------------------------------------------------------------*/
516    /* PHASE III - validation */
517    // The total memstore size should be the same as the sum of the sizes of
518    // memstores of CF1, CF2 and CF3. Counting the empty active segments in CF1/2/3 and pipeline
519    // items in CF1/2
520    assertEquals(totalMemstoreSizePhaseIII, cf1MemstoreSizePhaseIII.getDataSize()
521        + cf2MemstoreSizePhaseII.getDataSize() + cf3MemstoreSizePhaseII.getDataSize());
522
523    /*------------------------------------------------------------------------------*/
524    /* PHASE III - Flush */
525    // Second Flush in Test!!!!!!!!!!!!!!!!!!!!!!
526    // CF1 is flushed to disk, but not entirely emptied.
527    // CF2 was and remained empty, same way nothing happens to CF3
528    region.flush(false);
529
530    /*------------------------------------------------------------------------------*/
531    /*------------------------------------------------------------------------------*/
532    /* PHASE IV - collect sizes */
533    // Recalculate everything
534    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
535    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
536    MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
537    long smallestSeqInRegionCurrentMemstorePhaseIV = getWAL(region)
538        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
539    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
540
541    /*------------------------------------------------------------------------------*/
542    /* PHASE IV - validation */
543    // CF1's biggest pipeline component (inserted before first flush) should be flushed to disk
544    // CF2 should remain empty
545    assertTrue(cf1MemstoreSizePhaseIII.getDataSize() > cf1MemstoreSizePhaseIV.getDataSize());
546    assertEquals(0, cf2MemstoreSizePhaseIV.getDataSize());
547    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseIV.getHeapSize());
548    // CF3 shouldn't have been touched.
549    assertEquals(cf3MemstoreSizePhaseIV, cf3MemstoreSizePhaseII);
550    // the smallest LSN of CF3 shouldn't change
551    assertEquals(smallestSeqCF3PhaseII, smallestSeqCF3PhaseIV);
552    // CF3 should be bottleneck for WAL
553    assertEquals(smallestSeqInRegionCurrentMemstorePhaseIV, smallestSeqCF3PhaseIV);
554
555    /*------------------------------------------------------------------------------*/
556    /* PHASE IV - Flush */
557    // Third Flush in Test!!!!!!!!!!!!!!!!!!!!!!
558    // Force flush to disk on all memstores (flush parameter true).
559    // CF1/CF3 all flushed to disk. Note that active sets of CF1 and CF3 are empty
560    region.flush(true);
561
562    /*------------------------------------------------------------------------------*/
563    /*------------------------------------------------------------------------------*/
564    /* PHASE V - collect sizes */
565    // Recalculate everything
566    MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
567    MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
568    MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
569    long smallestSeqInRegionCurrentMemstorePhaseV = getWAL(region)
570        .getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
571    long totalMemstoreSizePhaseV = region.getMemStoreDataSize();
572
573    /*------------------------------------------------------------------------------*/
574    /* PHASE V - validation */
575    assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
576    assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
577    assertEquals(0, cf2MemstoreSizePhaseV.getDataSize());
578    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseV.getHeapSize());
579    assertEquals(0, cf3MemstoreSizePhaseV.getDataSize());
580    assertEquals(MutableSegment.DEEP_OVERHEAD, cf3MemstoreSizePhaseV.getHeapSize());
581    // The total memstores size should be empty
582    assertEquals(0, totalMemstoreSizePhaseV);
583    // Because there is nothing in any memstore the WAL's LSN should be -1
584    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstorePhaseV);
585
586    // What happens when we hit the memstore limit, but we are not able to find
587    // any Column Family above the threshold?
588    // In that case, we should flush all the CFs.
589
590    /*------------------------------------------------------------------------------*/
591    /*------------------------------------------------------------------------------*/
592    /* PHASE VI - insertions */
593    // The memstore limit is 200*1024 and the column family flush threshold is
594    // around 50*1024. We try to just hit the memstore limit with each CF's
595    // memstore being below the CF flush threshold.
596    for (int i = 1; i <= 300; i++) {
597      region.put(createPut(1, i));
598      region.put(createPut(2, i));
599      region.put(createPut(3, i));
600      region.put(createPut(4, i));
601      region.put(createPut(5, i));
602    }
603
604    MemStoreSize cf1ActiveSizePhaseVI = region.getStore(FAMILY1).getMemStoreSize();
605    MemStoreSize cf3ActiveSizePhaseVI = region.getStore(FAMILY3).getMemStoreSize();
606    MemStoreSize cf5ActiveSizePhaseVI = region.getStore(FAMILIES[4]).getMemStoreSize();
607
608    /*------------------------------------------------------------------------------*/
609    /* PHASE VI - Flush */
610    // Fourth Flush in Test!!!!!!!!!!!!!!!!!!!!!!
611    // None among compacting memstores was flushed to memory due to previous puts.
612    // But is going to be moved to pipeline and flatten due to the flush.
613    region.flush(false);
614    // Since we won't find any CF above the threshold, and hence no specific
615    // store to flush, we should flush all the memstores
616    // Also compacted memstores are flushed to disk, but not entirely emptied
617    MemStoreSize cf1ActiveSizePhaseVII = region.getStore(FAMILY1).getMemStoreSize();
618    MemStoreSize cf3ActiveSizePhaseVII = region.getStore(FAMILY3).getMemStoreSize();
619    MemStoreSize cf5ActiveSizePhaseVII = region.getStore(FAMILIES[4]).getMemStoreSize();
620
621    assertTrue(cf1ActiveSizePhaseVII.getDataSize() < cf1ActiveSizePhaseVI.getDataSize());
622    assertTrue(cf3ActiveSizePhaseVII.getDataSize() < cf3ActiveSizePhaseVI.getDataSize());
623    assertTrue(cf5ActiveSizePhaseVII.getDataSize() < cf5ActiveSizePhaseVI.getDataSize());
624
625    HBaseTestingUtil.closeRegionAndWAL(region);
626  }
627
628  @Test
629  public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
630    // Set up the configuration
631    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
632    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
633    // set memstore to do data compaction and not to use the speculative scan
634    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
635        String.valueOf(MemoryCompactionPolicy.EAGER));
636
637    // Intialize the HRegion
638    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
639    verifyInMemoryFlushSize(region);
640    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
641    for (int i = 1; i <= 1200; i++) {
642      region.put(createPut(1, i));
643      if (i <= 100) {
644        region.put(createPut(2, i));
645        if (i <= 50) {
646          region.put(createPut(3, i));
647        }
648      }
649    }
650    // Now add more puts for CF2, so that we only flush CF2 to disk
651    for (int i = 100; i < 2000; i++) {
652      region.put(createPut(2, i));
653    }
654
655    // in this test check the non-composite snapshot - flashing only tail of the pipeline
656    ((CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore).setCompositeSnapshot(false);
657    ((CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore).setCompositeSnapshot(false);
658
659    long totalMemstoreSize = region.getMemStoreDataSize();
660
661    // Find the sizes of the memstores of each CF.
662    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
663    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
664    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
665
666    // Some other sanity checks.
667    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
668    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
669    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
670
671    // The total memstore size should be the same as the sum of the sizes of
672    // memstores of CF1, CF2 and CF3.
673    String msg = "totalMemstoreSize="+totalMemstoreSize +
674        " DefaultMemStore.DEEP_OVERHEAD="+DefaultMemStore.DEEP_OVERHEAD +
675        " cf1MemstoreSizePhaseI="+cf1MemstoreSizePhaseI +
676        " cf2MemstoreSizePhaseI="+cf2MemstoreSizePhaseI +
677        " cf3MemstoreSizePhaseI="+cf3MemstoreSizePhaseI ;
678    assertEquals(msg, totalMemstoreSize, cf1MemstoreSizePhaseI.getDataSize()
679        + cf2MemstoreSizePhaseI.getDataSize() + cf3MemstoreSizePhaseI.getDataSize());
680
681    // Flush!
682    CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
683    CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
684    cms1.flushInMemory();
685    cms3.flushInMemory();
686    region.flush(false);
687
688    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
689
690    long smallestSeqInRegionCurrentMemstorePhaseII =
691        region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
692    long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
693    long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
694    long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
695
696    // CF2 should have been cleared
697    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
698    assertEquals(MutableSegment.DEEP_OVERHEAD, cf2MemstoreSizePhaseII.getHeapSize());
699
700    String s = "\n\n----------------------------------\n"
701        + "Upon initial insert and flush, LSN of CF1 is:"
702        + smallestSeqCF1PhaseII + ". LSN of CF2 is:"
703        + smallestSeqCF2PhaseII + ". LSN of CF3 is:"
704        + smallestSeqCF3PhaseII + ", smallestSeqInRegionCurrentMemstore:"
705        + smallestSeqInRegionCurrentMemstorePhaseII + "\n";
706
707    // Add same entries to compact them later
708    for (int i = 1; i <= 1200; i++) {
709      region.put(createPut(1, i));
710      if (i <= 100) {
711        region.put(createPut(2, i));
712        if (i <= 50) {
713          region.put(createPut(3, i));
714        }
715      }
716    }
717    // Now add more puts for CF2, so that we only flush CF2 to disk
718    for (int i = 100; i < 2000; i++) {
719      region.put(createPut(2, i));
720    }
721
722    long smallestSeqInRegionCurrentMemstorePhaseIII =
723        region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
724    long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
725    long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
726    long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
727
728    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIII
729        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIII + ", " +
730        "the smallest sequence in CF2:"
731        + smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n";
732
733    // Flush!
734    cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
735    cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
736    cms1.flushInMemory();
737    cms3.flushInMemory();
738    region.flush(false);
739
740    long smallestSeqInRegionCurrentMemstorePhaseIV =
741        region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
742    long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
743    long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
744    long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
745
746    s = s + "The smallest sequence in region WAL is: " + smallestSeqInRegionCurrentMemstorePhaseIV
747        + ", the smallest sequence in CF1:" + smallestSeqCF1PhaseIV + ", " +
748        "the smallest sequence in CF2:"
749        + smallestSeqCF2PhaseIV +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIV + "\n";
750
751    // now check that the LSN of the entire WAL, of CF1 and of CF3 has progressed due to compaction
752    assertTrue(s, smallestSeqInRegionCurrentMemstorePhaseIV >
753        smallestSeqInRegionCurrentMemstorePhaseIII);
754    assertTrue(smallestSeqCF1PhaseIV > smallestSeqCF1PhaseIII);
755    assertTrue(smallestSeqCF3PhaseIV > smallestSeqCF3PhaseIII);
756
757    HBaseTestingUtil.closeRegionAndWAL(region);
758  }
759
760  @Test
761  public void testSelectiveFlushWithBasicAndMerge() throws IOException {
762    // Set up the configuration
763    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300 * 1024);
764    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 75 * 1024);
765    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.8);
766    // set memstore to do index compaction with merge
767    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
768        String.valueOf(MemoryCompactionPolicy.BASIC));
769    // length of pipeline that requires merge
770    conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
771
772    // Intialize the HRegion
773    HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
774    verifyInMemoryFlushSize(region);
775    // Add 1200 entries for CF1 (CompactingMemStore), 100 for CF2 (DefaultMemStore) and 50 for CF3
776    for (int i = 1; i <= 1200; i++) {
777      region.put(createPut(1, i));
778      if (i <= 100) {
779        region.put(createPut(2, i));
780        if (i <= 50) {
781          region.put(createPut(3, i));
782        }
783      }
784    }
785    // Now put more entries to CF2
786    for (int i = 100; i < 2000; i++) {
787      region.put(createPut(2, i));
788    }
789
790    long totalMemstoreSize = region.getMemStoreDataSize();
791
792    // test in-memory flashing into CAM here
793    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).setIndexType(
794        CompactingMemStore.IndexType.ARRAY_MAP);
795    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).setIndexType(
796        CompactingMemStore.IndexType.ARRAY_MAP);
797
798    // Find the sizes of the memstores of each CF.
799    MemStoreSize cf1MemstoreSizePhaseI = region.getStore(FAMILY1).getMemStoreSize();
800    MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
801    MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
802
803    // Some other sanity checks.
804    assertTrue(cf1MemstoreSizePhaseI.getDataSize() > 0);
805    assertTrue(cf2MemstoreSizePhaseI.getDataSize() > 0);
806    assertTrue(cf3MemstoreSizePhaseI.getDataSize() > 0);
807
808    // The total memstore size should be the same as the sum of the sizes of
809    // memstores of CF1, CF2 and CF3.
810    assertEquals(totalMemstoreSize,
811        cf1MemstoreSizePhaseI.getDataSize() + cf2MemstoreSizePhaseI.getDataSize()
812            + cf3MemstoreSizePhaseI.getDataSize());
813
814    // Initiate in-memory Flush!
815    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
816    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
817    // CF1 and CF3 should be flatten and merged so wait here to be sure the merge is done
818    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
819        .isMemStoreFlushingInMemory()) {
820      Threads.sleep(10);
821    }
822    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
823        .isMemStoreFlushingInMemory()) {
824      Threads.sleep(10);
825    }
826
827    // Flush-to-disk! CF2 only should be flushed
828    region.flush(false);
829
830    MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
831    MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
832    MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
833
834    // CF1 should be flushed in memory and just flattened, so CF1 heap overhead should be smaller
835    assertTrue(cf1MemstoreSizePhaseI.getHeapSize() > cf1MemstoreSizePhaseII.getHeapSize());
836    // CF1 should be flushed in memory and just flattened, so CF1 data size should remain the same
837    assertEquals(cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseII.getDataSize());
838    // CF2 should have been cleared
839    assertEquals(0, cf2MemstoreSizePhaseII.getDataSize());
840
841    // Add the same amount of entries to see the merging
842    for (int i = 1; i <= 1200; i++) {
843      region.put(createPut(1, i));
844      if (i <= 100) {
845        region.put(createPut(2, i));
846        if (i <= 50) {
847          region.put(createPut(3, i));
848        }
849      }
850    }
851    // Now add more puts for CF2, so that we only flush CF2 to disk
852    for (int i = 100; i < 2000; i++) {
853      region.put(createPut(2, i));
854    }
855
856    MemStoreSize cf1MemstoreSizePhaseIII = region.getStore(FAMILY1).getMemStoreSize();
857
858    // Flush in memory!
859    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
860    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
861    // CF1 and CF3 should be merged so wait here to be sure the merge is done
862    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
863        .isMemStoreFlushingInMemory()) {
864      Threads.sleep(10);
865    }
866    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
867        .isMemStoreFlushingInMemory()) {
868      Threads.sleep(10);
869    }
870    region.flush(false);
871
872    MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
873    MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
874
875    assertEquals(2*cf1MemstoreSizePhaseI.getDataSize(), cf1MemstoreSizePhaseIV.getDataSize());
876    // the decrease in the heap size due to usage of CellArrayMap instead of CSLM
877    // should be the same in flattening and in merge (first and second in-memory-flush)
878    // but in phase 1 we do not yet have immutable segment
879    assertEquals(
880        cf1MemstoreSizePhaseI.getHeapSize() - cf1MemstoreSizePhaseII.getHeapSize(),
881        cf1MemstoreSizePhaseIII.getHeapSize() - cf1MemstoreSizePhaseIV.getHeapSize()
882            - CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
883    assertEquals(3, // active, one in pipeline, snapshot
884        ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getSegments().size());
885    // CF2 should have been cleared
886    assertEquals("\n<<< DEBUG: The data--heap sizes of stores before/after first flushes,"
887            + " CF1: " + cf1MemstoreSizePhaseI.getDataSize() + "/" + cf1MemstoreSizePhaseII
888            .getDataSize() + "--" + cf1MemstoreSizePhaseI.getHeapSize() + "/" + cf1MemstoreSizePhaseII
889            .getHeapSize() + ", CF2: " + cf2MemstoreSizePhaseI.getDataSize() + "/"
890            + cf2MemstoreSizePhaseII.getDataSize() + "--" + cf2MemstoreSizePhaseI.getHeapSize() + "/"
891            + cf2MemstoreSizePhaseII.getHeapSize() + ", CF3: " + cf3MemstoreSizePhaseI.getDataSize()
892            + "/" + cf3MemstoreSizePhaseII.getDataSize() + "--" + cf3MemstoreSizePhaseI.getHeapSize()
893            + "/" + cf3MemstoreSizePhaseII.getHeapSize() + "\n<<< AND before/after second flushes "
894            + " CF1: " + cf1MemstoreSizePhaseIII.getDataSize() + "/" + cf1MemstoreSizePhaseIV
895            .getDataSize() + "--" + cf1MemstoreSizePhaseIII.getHeapSize() + "/" + cf1MemstoreSizePhaseIV
896            .getHeapSize() + "\n",
897        0, cf2MemstoreSizePhaseIV.getDataSize());
898
899    HBaseTestingUtil.closeRegionAndWAL(region);
900  }
901
902  // should end in 300 seconds (5 minutes)
903  @Test
904  public void testStressFlushAndWALinIndexCompaction() throws IOException {
905    // Set up the configuration
906    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 600 * 1024);
907    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
908        200 * 1024);
909    // set memstore to do data compaction and not to use the speculative scan
910    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
911        String.valueOf(MemoryCompactionPolicy.BASIC));
912
913    // Successfully initialize the HRegion
914    HRegion region = initHRegion("testSelectiveFlushAndWALinDataCompaction", conf);
915    verifyInMemoryFlushSize(region);
916    Thread[] threads = new Thread[25];
917    for (int i = 0; i < threads.length; i++) {
918      int id = i * 10000;
919      ConcurrentPutRunnable runnable = new ConcurrentPutRunnable(region, id);
920      threads[i] = new Thread(runnable);
921      threads[i].start();
922    }
923    Threads.sleep(10000); // let other threads start
924    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
925    Threads.sleep(10000); // let other threads continue
926    region.flush(true); // enforce flush of everything TO DISK while there are still ongoing puts
927
928    ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).flushInMemory();
929    ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).flushInMemory();
930    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore)
931        .isMemStoreFlushingInMemory()) {
932      Threads.sleep(10);
933    }
934    while (((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore)
935        .isMemStoreFlushingInMemory()) {
936      Threads.sleep(10);
937    }
938
939    for (int i = 0; i < threads.length; i++) {
940      try {
941        threads[i].join();
942      } catch (InterruptedException e) {
943        e.printStackTrace();
944      }
945    }
946  }
947
948  /**
949   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
950   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
951   * releases updatesLock and compacts the pipeline.
952   */
953  private class ConcurrentPutRunnable implements Runnable {
954    private final HRegion stressedRegion;
955    private final int startNumber;
956
957    ConcurrentPutRunnable(HRegion r, int i) {
958      this.stressedRegion = r;
959      this.startNumber = i;
960    }
961
962    @Override
963    public void run() {
964
965      try {
966        int dummy = startNumber / 10000;
967        System.out.print("Thread " + dummy + " with start number " + startNumber + " starts\n");
968        // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
969        for (int i = startNumber; i <= startNumber + 3000; i++) {
970          stressedRegion.put(createPut(1, i));
971          if (i <= startNumber + 2000) {
972            stressedRegion.put(createPut(2, i));
973            if (i <= startNumber + 1000) {
974              stressedRegion.put(createPut(3, i));
975            }
976          }
977        }
978        System.out.print("Thread with start number " + startNumber + " continues to more puts\n");
979        // Now add more puts for CF2, so that we only flush CF2 to disk
980        for (int i = startNumber + 3000; i < startNumber + 5000; i++) {
981          stressedRegion.put(createPut(2, i));
982        }
983        // And add more puts for CF1
984        for (int i = startNumber + 5000; i < startNumber + 7000; i++) {
985          stressedRegion.put(createPut(1, i));
986        }
987        System.out.print("Thread with start number " + startNumber + " flushes\n");
988        // flush (IN MEMORY) one of the stores (each thread flushes different store)
989        // and wait till the flush and the following action are done
990        if (startNumber == 0) {
991          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
992              .flushInMemory();
993          while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY1)).memstore)
994              .isMemStoreFlushingInMemory()) {
995            Threads.sleep(10);
996          }
997        }
998        if (startNumber == 10000) {
999          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore).flushInMemory();
1000          while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY2)).memstore)
1001              .isMemStoreFlushingInMemory()) {
1002            Threads.sleep(10);
1003          }
1004        }
1005        if (startNumber == 20000) {
1006          ((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore).flushInMemory();
1007          while (((CompactingMemStore) ((HStore) stressedRegion.getStore(FAMILY3)).memstore)
1008              .isMemStoreFlushingInMemory()) {
1009            Threads.sleep(10);
1010          }
1011        }
1012        System.out.print("Thread with start number " + startNumber + " finishes\n");
1013      } catch (IOException e) {
1014        assert false;
1015      }
1016    }
1017  }
1018
1019  private WAL getWAL(Region region) {
1020    return ((HRegion)region).getWAL();
1021  }
1022}